-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
I noticed an inconsistency in how the optimizer handles ordering in certain scenarios, particularly involving the prefer_existing_sort configuration and the creation behavior of AggregateExec.
1. Background on prefer_existing_sort
The prefer_existing_sort configuration, part of the enforce_distribution optimizer rule, determines whether the optimizer should use an order-preserving RepartitionExec or a non-order-preserving one. If order needs to be satisfied above the RepartitionExec, a SortExec is added.
datafusion/datafusion/core/src/physical_optimizer/enforce_distribution.rs
Lines 1279 to 1293 in 5edb276
| if (!ordering_satisfied || !order_preserving_variants_desirable) | |
| && child.data | |
| { | |
| child = replace_order_preserving_variants(child)?; | |
| // If ordering requirements were satisfied before repartitioning, | |
| // make sure ordering requirements are still satisfied after. | |
| if ordering_satisfied { | |
| // Make sure to satisfy ordering requirement: | |
| child = add_sort_above_with_check( | |
| child, | |
| required_input_ordering.clone(), | |
| None, | |
| ); | |
| } | |
| } |
2. Creation Behavior of AggregateExec
AggregateExec sets its required_input_ordering based solely on its group-by expressions without checking any configuration like prefer_existing_sort. This effectively makes the ordering a hard requirement.
datafusion/datafusion/physical-plan/src/aggregates/mod.rs
Lines 461 to 479 in 5edb276
| let indices = get_ordered_partition_by_indices(&groupby_exprs, &input); | |
| let mut new_requirement = LexRequirement::new( | |
| indices | |
| .iter() | |
| .map(|&idx| PhysicalSortRequirement { | |
| expr: Arc::clone(&groupby_exprs[idx]), | |
| options: None, | |
| }) | |
| .collect::<Vec<_>>(), | |
| ); | |
| let req = get_finer_aggregate_exprs_requirement( | |
| &mut aggr_expr, | |
| &group_by, | |
| input_eq_properties, | |
| &mode, | |
| )?; | |
| new_requirement.inner.extend(req); | |
| new_requirement = new_requirement.collapse(); |
3. The issue
When these two behaviors interact, if the order is being preserved below a RepartitionExec and above the RepartitionExec if there's an AggregateExec, the optimizer decides to add a SortExec, no matter what prefer_existing_sort is set (because now it's a hard requirement).
AggregateExec: mode=FinalPartitioned, ...
SortExec: ..., preserve_partitioning=[true]
RepartitionExec: ....
CsvExec: ...
While AggregateExec benefits from receiving ordered input, adding a SortExec in this context can incur a significant performance cost, negating any benefits of preserving the order.
4. Possible solutions:
A straightforward approach could involve AggregateExec respecting the prefer_existing_sort configuration before adding ordering requirements. However, this introduces challenges:
The prefer_existing_sort setting exists at the optimizer level and injecting it into AggregateExec may lead to poor design. Also evaluating this configuration at runtime feels conceptually incorrect.
Given these challenges, I wanted to open a discussion on alternative solutions or design approaches to address this behavior.
Looking forward to hearing the community's thoughts on this!