-
Notifications
You must be signed in to change notification settings - Fork 588
[VL] Result mismatch in CollectList when partial sort is involved #8184
Copy link
Copy link
Open
Labels
Description
Backend
VL (Velox)
Bug description
Describe the issue
Reproducing SQL:
CREATE OR REPLACE TEMP VIEW temp_table AS
SELECT * FROM VALUES
(1, 'a'), (1, 'b'), (1, 'c'),
(2, 'd'), (2, 'e'), (2, 'f'),
(3, 'g'), (3, 'h'), (3, 'i')
AS t(id, value);
SELECT id, collect_list(value) AS values_list
FROM (
SELECT * FROM
(SELECT id, value
FROM temp_table
DISTRIBUTE BY rand()) -- Forces a shuffle
DISTRIBUTE BY id SORT BY id, value
) t
GROUP BY id;Results:
-
The vanilla result is deterministic and values_list is sorted by value column:
id values_list 1 ["a", "b", "c"] 2 ["d", "e", "f"] 3 ["g", "h", "i"] -
The gluten result is non-deterministic and values_list is not sorted, e.g. :
id values_list 1 ["a", "c", "b"] 3 ["g", "i", "h"] 2 ["f", "e", "d"]
The gluten physical plan:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
VeloxColumnarToRowExec
+- ^(9) HashAggregateTransformer(keys=[id#0], functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, values_list#22])
+- ^(9) HashAggregateTransformer(keys=[id#0], functions=[partial_velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, buffer#29])
+- ^(9) InputIteratorTransformer[id#0, value#1]
+- ShuffleQueryStage 1
+- ColumnarExchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id#0, value#1], [id=#1293], [id=#1293], [OUTPUT] List(id:IntegerType, value:StringType), [OUTPUT] List(id:IntegerType, value:StringType)
+- VeloxAppendBatches 3276
+- ^(8) ProjectExecTransformer [hash(id#0, 42) AS hash_partition_key#31, id#0, value#1]
+- ^(8) InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
+- ShuffleQueryStage 0
+- ColumnarExchange hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id#0, value#1, _nondeterministic#24], [id=#1239], [id=#1239], [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
+- VeloxAppendBatches 3276
+- ^(7) ProjectExecTransformer [hash(_nondeterministic#24, 42) AS hash_partition_key#30, id#0, value#1, _nondeterministic#24]
+- ^(7) InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
+- RowToVeloxColumnar
+- LocalTableScan [id#0, value#1, _nondeterministic#24]
+- == Initial Plan ==
SortAggregate(key=[id#0], functions=[velox_collect_list(value#1)], output=[id#0, values_list#22])
+- SortAggregate(key=[id#0], functions=[partial_velox_collect_list(value#1)], output=[id#0, buffer#29])
+- Sort [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id=#1211]
+- Project [id#0, value#1]
+- Exchange hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id=#1209]
+- LocalTableScan [id#0, value#1, _nondeterministic#24]
Even though the collect_list function is non-deterministic, as stated in the documentation, some ETL tasks in our production environment depend on this behavior in vanilla Spark.
Root cause for this issue
We can see the Sort operator is removed through the gluten plan. This change appears to be due to this code snippet: code link.
class ReplaceSingleNode() extends LogLevelUtil with Logging {
def doReplace(p: SparkPlan): SparkPlan = {
// ....
case plan: SortAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
// ...
}
object SortUtils {
def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
case PartialSortLike(child) => child
// from pre/post project-pulling
case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet == child.outputSet =>
child
case ProjectLike(PartialSortLike(child)) => plan.withNewChildren(Seq(child))
case _ => plan
}
}
I'm wondering why the partial sort added by SQL 'sort by' needs to be removed for SortAggregateExec. Would it be possible to retain the partial sort operator for resolving this issue?
Spark version
None
Spark configurations
No response
System information
No response
Relevant logs
No response
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
No status