@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path
2525import org .apache .spark .sql .{CometTestBase , DataFrame , Row }
2626import org .apache .spark .sql .catalyst .optimizer .EliminateSorts
2727import org .apache .spark .sql .comet .CometHashAggregateExec
28+ import org .apache .spark .sql .comet .execution .shuffle .CometShuffleExchangeExec
2829import org .apache .spark .sql .execution .adaptive .AdaptiveSparkPlanHelper
2930import org .apache .spark .sql .functions .{count_distinct , sum }
3031import org .apache .spark .sql .internal .SQLConf
@@ -89,6 +90,37 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
8990 }
9091 }
9192
93+ // based on Spark's SQLWindowFunctionSuite test of the same name
94+ test(" window function: partition and order expressions" ) {
95+ for (shuffleMode <- Seq (" auto" , " native" , " jvm" )) {
96+ withSQLConf(CometConf .COMET_SHUFFLE_MODE .key -> shuffleMode) {
97+ val df =
98+ Seq ((1 , " a" , 5 ), (2 , " a" , 6 ), (3 , " b" , 7 ), (4 , " b" , 8 ), (5 , " c" , 9 ), (6 , " c" , 10 )).toDF(
99+ " month" ,
100+ " area" ,
101+ " product" )
102+ df.createOrReplaceTempView(" windowData" )
103+ val df2 = sql("""
104+ |select month, area, product, sum(product + 1) over (partition by 1 order by 2)
105+ |from windowData
106+ """ .stripMargin)
107+ checkSparkAnswer(df2)
108+ val cometShuffles = collect(df2.queryExecution.executedPlan) {
109+ case _ : CometShuffleExchangeExec => true
110+ }
111+ if (shuffleMode == " jvm" ) {
112+ assert(cometShuffles.length == 1 )
113+ } else {
114+ // we fall back to Spark for shuffle because we do not support
115+ // native shuffle with a LocalTableScan input, and we do not fall
116+ // back to Comet columnar shuffle due to
117+ // https://github.com/apache/datafusion-comet/issues/1248
118+ assert(cometShuffles.isEmpty)
119+ }
120+ }
121+ }
122+ }
123+
92124 test(" multiple column distinct count" ) {
93125 withSQLConf(
94126 CometConf .COMET_ENABLED .key -> " true" ,
0 commit comments