Skip to content

Wrong aggregation result in Spark SQL tests after enabling columnar shuffle #260

@viirya

Description

@viirya

Describe the bug

While trying to enable columnar shuffle by default, I found some Spark SQL tests are failed. Some are wrong aggregate result, e.g.

SQLQuerySuite: SPARK-8828 sum should return null if all input values are null

[info]   == Physical Plan ==                                                                                                                                                                                                           
[info]   AdaptiveSparkPlan isFinalPlan=true                                                                                                                                                                                            
[info]   +- == Final Plan ==                                                                                                                                                                                                           
[info]      *(2) ColumnarToRow                                                                                                                                                                                                         
[info]      +- CometHashAggregate [sum#3766L, sum#3767, count#3768L], Final, [sum(a#136), avg(a#136)]                                                                                                                                  
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5228]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_sum(a#136), partial_avg(a#136)], output=[sum#3766L, sum#3767, count#3768L])
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$NullInts, true])).a.intValue AS a#136]
[info]                        +- Scan[obj#135]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [sum#3766L, sum#3767, count#3768L], Final, [sum(a#136), avg(a#136)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5117]
[info]         +- HashAggregate(keys=[], functions=[partial_sum(a#136), partial_avg(a#136)], output=[sum#3766L, sum#3767, count#3768L])
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$NullInts, true])).a.intValue AS a#136]
[info]               +- Scan[obj#135]
...
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<sum(a):bigint,avg(a):double>
[info]   ![null,null]                [null,NaN] (QueryTest.scala:243)

aggregation with codegen:

== Physical Plan ==
[info]   AdaptiveSparkPlan isFinalPlan=true
[info]   +- == Final Plan ==
[info]      *(2) ColumnarToRow
[info]      +- CometHashAggregate [sum#4362, sum#4363, count#4364L], Final, [sum(null), avg(null)]
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10168]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_sum(null), partial_avg(null)], output=[sum#4362, sum#4363, count#4364L])
[info]                     +- *(1) SerializeFromObject
[info]                        +- Scan[obj#12]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [sum#4362, sum#4363, count#4364L], Final, [sum(null), avg(null)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10060]
[info]         +- HashAggregate(keys=[], functions=[partial_sum(null), partial_avg(null)], output=[sum#4362, sum#4363, count#4364L])
[info]            +- SerializeFromObject
[info]               +- Scan[obj#12]
[info]   
[info]   == Results ==
[info]   
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<sum(a):double,avg(a):double,count(NULL):bigint>
[info]   ![null,null,0]              [null,NaN,0] (QueryTest.scala:243)

SPARK-3176 Added Parser of SQL LAST():

[info]   == Physical Plan ==
[info]   AdaptiveSparkPlan isFinalPlan=true
[info]   +- == Final Plan ==
[info]      *(2) ColumnarToRow
[info]      +- CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10390]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info]                        +- Scan[obj#92]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10279]
[info]         +- HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info]               +- Scan[obj#92]
[info]   
[info]   == Results ==
[info]   
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<last(n):int>
[info]   ![4]                        [2] (QueryTest.scala:243)

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions