-
Notifications
You must be signed in to change notification settings - Fork 270
Closed
Labels
Description
What is the problem the feature request solves?
I noticed that we execute each query stage with two separate native plans.
For example, here is the first query stage for TPC-H q1:
+- CometExchange: Hash partitioning on [l_returnflag, l_linestatus]
+- CometHashAggregate (Partial): keys = [l_returnflag, l_linestatus]
+- CometProject
+- CometFilter: l_shipdate NOT NULL AND l_shipdate <= 1998-09-24
+- CometScan: lineitem.parquet
We execute one plan for the aggregate:
AggregateExec: mode=Partial, gby=[col_4@4 as col_0, col_5@5 as col_1], aggr=[sum, sum, sum, sum, avg, avg, avg, count]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5]
FilterExec: col_6@6 IS NOT NULL AND col_6@6 <= 1998-09-24
ScanExec: source=[CometScan parquet (unknown)]
We then stream those results back into the JVM and then stream them back out to the following native plan to perform the shuffle write:
ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }], 200)
ScanExec: source=[], schema=[col_0: Utf8, col_1: Utf8, ..]
Would it be possible to combine these so that we just have the following plan? This would avoid a lot of JNI back and forth between the aggregate and the shuffle write.
ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }], 200)
AggregateExec: mode=Partial, gby=[col_4@4 as col_0, col_5@5 as col_1], aggr=[sum, sum, sum, sum, avg, avg, avg, count]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5]
FilterExec: col_6@6 IS NOT NULL AND col_6@6 <= 1998-09-24
ScanExec: source=[CometScan parquet (unknown)]
Describe the potential solution
No response
Additional context
No response