Skip to content

[VL] Result mismatch in CollectList when partial sort is involved #8184

@NEUpanning

Description

@NEUpanning

Backend

VL (Velox)

Bug description

Describe the issue

Reproducing SQL:

CREATE OR REPLACE TEMP VIEW temp_table AS
SELECT * FROM VALUES
  (1, 'a'), (1, 'b'), (1, 'c'),
  (2, 'd'), (2, 'e'), (2, 'f'),
  (3, 'g'), (3, 'h'), (3, 'i')
AS t(id, value);

SELECT id, collect_list(value) AS values_list
FROM (
  SELECT * FROM
  (SELECT id, value
   FROM temp_table
   DISTRIBUTE BY rand())  -- Forces a shuffle
  DISTRIBUTE BY id SORT BY id, value
) t
GROUP BY id;

Results:

  • The vanilla result is deterministic and values_list is sorted by value column:

    id   values_list
    1    ["a", "b", "c"]
    2    ["d", "e", "f"]
    3    ["g", "h", "i"]
    
  • The gluten result is non-deterministic and values_list is not sorted, e.g. :

    id   values_list
    1    ["a", "c", "b"]
    3    ["g", "i", "h"]
    2    ["f", "e", "d"]
    

The gluten physical plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   VeloxColumnarToRowExec
   +- ^(9) HashAggregateTransformer(keys=[id#0], functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, values_list#22])
      +- ^(9) HashAggregateTransformer(keys=[id#0], functions=[partial_velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, buffer#29])
         +- ^(9) InputIteratorTransformer[id#0, value#1]
            +- ShuffleQueryStage 1
               +- ColumnarExchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id#0, value#1], [id=#1293], [id=#1293], [OUTPUT] List(id:IntegerType, value:StringType), [OUTPUT] List(id:IntegerType, value:StringType)
                  +- VeloxAppendBatches 3276
                     +- ^(8) ProjectExecTransformer [hash(id#0, 42) AS hash_partition_key#31, id#0, value#1]
                        +- ^(8) InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
                           +- ShuffleQueryStage 0
                              +- ColumnarExchange hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id#0, value#1, _nondeterministic#24], [id=#1239], [id=#1239], [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
                                 +- VeloxAppendBatches 3276
                                    +- ^(7) ProjectExecTransformer [hash(_nondeterministic#24, 42) AS hash_partition_key#30, id#0, value#1, _nondeterministic#24]
                                       +- ^(7) InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
                                          +- RowToVeloxColumnar
                                             +- LocalTableScan [id#0, value#1, _nondeterministic#24]
+- == Initial Plan ==
   SortAggregate(key=[id#0], functions=[velox_collect_list(value#1)], output=[id#0, values_list#22])
   +- SortAggregate(key=[id#0], functions=[partial_velox_collect_list(value#1)], output=[id#0, buffer#29])
      +- Sort [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id=#1211]
            +- Project [id#0, value#1]
               +- Exchange hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id=#1209]
                  +- LocalTableScan [id#0, value#1, _nondeterministic#24]

Even though the collect_list function is non-deterministic, as stated in the documentation, some ETL tasks in our production environment depend on this behavior in vanilla Spark.

Root cause for this issue

We can see the Sort operator is removed through the gluten plan. This change appears to be due to this code snippet: code link.

class ReplaceSingleNode() extends LogLevelUtil with Logging {

    def doReplace(p: SparkPlan): SparkPlan = {
// ....
    case plan: SortAggregateExec =>
      logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
      HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
// ...
}
object SortUtils {
  def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
    case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
    case PartialSortLike(child) => child
    // from pre/post project-pulling
    case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet == child.outputSet =>
      child
    case ProjectLike(PartialSortLike(child)) => plan.withNewChildren(Seq(child))
    case _ => plan
  }
}          
          

I'm wondering why the partial sort added by SQL 'sort by' needs to be removed for SortAggregateExec. Would it be possible to retain the partial sort operator for resolving this issue?

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriage

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions