-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Problem statement
The projection pushdown optimizer rule / implementations generally only push down a projection if it "narrows" a schema (i.e. has less output expressions than input expressions) and the output expressions are all columns or literals:
datafusion/datafusion/physical-plan/src/filter.rs
Lines 470 to 471 in d68b629
| // If the projection does not narrow the schema, we should not try to push it down: | |
| if projection.expr().len() < projection.input().schema().fields().len() { |
datafusion/datafusion/physical-plan/src/repartition/mod.rs
Lines 1045 to 1055 in d68b629
| // If the projection does not narrow the schema, we should not try to push it down. | |
| if projection.expr().len() >= projection.input().schema().fields().len() { | |
| return Ok(None); | |
| } | |
| // If pushdown is not beneficial or applicable, break it. | |
| if projection.benefits_from_input_partitioning()[0] | |
| || !all_columns(projection.expr()) | |
| { | |
| return Ok(None); | |
| } |
datafusion/datafusion/physical-plan/src/projection.rs
Lines 255 to 268 in d68b629
| fn benefits_from_input_partitioning(&self) -> Vec<bool> { | |
| let all_simple_exprs = | |
| self.projector | |
| .projection() | |
| .as_ref() | |
| .iter() | |
| .all(|proj_expr| { | |
| proj_expr.expr.as_any().is::<Column>() | |
| || proj_expr.expr.as_any().is::<Literal>() | |
| }); | |
| // If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename, | |
| // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false. | |
| vec![!all_simple_exprs] | |
| } |
This is problematic with a plan like:
copy (
select 1 as id, named_struct('large_string_field', 'big text!', 'small_int_field', 2) as large_struct
)
TO 'struct.parquet';
create external table t stored as parquet location 'struct.parquet';
explain format indent
select large_struct['small_int_field'] * 2 from t where id = 1;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: get_field(t.large_struct, Utf8("small_int_field")) * Int64(2) |
| | Filter: t.id = Int64(1) |
| | TableScan: t projection=[id, large_struct], partial_filters=[t.id = Int64(1)] |
| physical_plan | ProjectionExec: expr=[get_field(large_struct@0, small_int_field) * 2 as t.large_struct[small_int_field] * Int64(2)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: id@0 = 1, projection=[large_struct@1] |
| | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion/struct.parquet]]}, projection=[id, large_struct], file_type=parquet, predicate=id@0 = 1 |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The problem here is that get_field(large_struct@0, small_int_field) is not getting pushed down into Parquet, forcing the entire struct column (including the large_string_field) to get read into memory (pending #11745 the Parquet machinery will be able to push down struct field access).
The reasoning for the current status quo is that evaluating an expression has a computational cost, thus we certainly do not want to push it below filters, probably not even below a CoalesceBatchesExec so that the computation can be done on larger batches.
With RepartitionExec the logic is that computation benefits from re-partitioning, so it it makes sense to do it after a repartition.
However since the field access expression is essentially negative compute to evaluate in Parquet scan optimal query plan would look like:
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: get_field(t.large_struct, Utf8("small_int_field")) * Int64(2) |
| | Filter: t.id = Int64(1) |
| | TableScan: t projection=[id, large_struct], partial_filters=[t.id = Int64(1)] |
| physical_plan | ProjectionExec: expr=[large_struct[small_int_field] * 2 as t.large_struct[small_int_field] * Int64(2)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: id@0 = 1, projection=[large_struct[small_int_field]@1] |
| | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion/struct.parquet]]}, projection=[id, large_struct[small_int_field]], file_type=parquet, predicate=id@0 = 1 |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
(Essentially push large_struct[small_int_field] down into the scan and pass it up as a column).
Solutions considered
Giving expressions the ability to determine their cost relative to a column selection
The idea would be to add a method to PhysicalExpr and ScalarFunctionUDF along the lines of should_push_down() -> bool or something. This would default to false and we set it to true only for Column and get_field(). We could also make it is_column_field_access() -> bool or column_field_path() -> Option<(Column, Vec<FieldPath>)> which would be closer to the field path proposal below.
I think we will also need to add functionality to "split" a ProjectionExec into two get_field(large_struct, small_int_field) * 2 -> col(0) * 2 (retained) and get_field(large_struct, small_int_field) (pushed down). I think this could be added as a method on ProjectionExec or ProjectionExprs.
Cost based pushdown / optimizer
This is a generalization of should_push_down(), consider something like pushdown_cost() -> f64.
A full blown cost based optimizer would probably handle things this way, but given that there is not a cost based optimizer in DataFusion and no long term plan for adding one this seems like unnecessary complexity at this point.
First class field path
Currently we have a Column expression that does not handle sub-fields/selections at all.
We could either expand Column to have some sort of path: Option<Vec<String>> (maybe Vec<FieldPath>?).
This is enticing if it made the system in general more aware of nested structures and if it could represent field access for both structs and json/variant. But it feels like that last point especially might be a bit sticky: do we want to be able to represent struct[list_field][0]? If not do we end up with a mix of both worlds i.e. get_field(column_path(struct, [list_field]), 0)?
There is also the issue that for structs the structure of which is known ahead of time, but for variant it can very row by row i.e. can't be validated.
Finally each one of these different path accesses is evaluated differently, both in how they are optimized at the scan level and how they are evaluated against in-memory data. So having Vec<FieldPath> is not enough. We'd need to retain a reference to how to evaluate that path as well (i.e. a reference to get_field or similar).
So the structure would end up looking something like:
enum FieldPath {
Field(String),
Index(usize)
}
struct ColumnPath {
column: Column,
path: FieldPath,
evaluate: Arc<dyn FieldPathEvaluator>, // ?
}And we would have to have an optimizer rule that converts get_field or variant_get into these sorts of expressions.