Skip to content

Possible native shuffle optimization #977

@andygrove

Description

@andygrove

What is the problem the feature request solves?

I noticed that we execute each query stage with two separate native plans.

For example, here is the first query stage for TPC-H q1:

+- CometExchange: Hash partitioning on [l_returnflag, l_linestatus]
   +- CometHashAggregate (Partial): keys = [l_returnflag, l_linestatus]
      +- CometProject
         +- CometFilter: l_shipdate NOT NULL AND l_shipdate <= 1998-09-24
            +- CometScan: lineitem.parquet

We execute one plan for the aggregate:

AggregateExec: mode=Partial, gby=[col_4@4 as col_0, col_5@5 as col_1], aggr=[sum, sum, sum, sum, avg, avg, avg, count]
  ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5]
    FilterExec: col_6@6 IS NOT NULL AND col_6@6 <= 1998-09-24
      ScanExec: source=[CometScan parquet  (unknown)]

We then stream those results back into the JVM and then stream them back out to the following native plan to perform the shuffle write:

ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }], 200)
  ScanExec: source=[], schema=[col_0: Utf8, col_1: Utf8, ..]

Would it be possible to combine these so that we just have the following plan? This would avoid a lot of JNI back and forth between the aggregate and the shuffle write.

ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }], 200)
  AggregateExec: mode=Partial, gby=[col_4@4 as col_0, col_5@5 as col_1], aggr=[sum, sum, sum, sum, avg, avg, avg, count]
    ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5]
      FilterExec: col_6@6 IS NOT NULL AND col_6@6 <= 1998-09-24
        ScanExec: source=[CometScan parquet  (unknown)]

Describe the potential solution

No response

Additional context

No response

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions