-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
I have been troubleshooting the TPCH-DS query 64, and have found some performance issues, seemingly stemming from ExecutionPlan::output_partitioning, ExecutionPlan::equivalence_properties and their inherent branching nature throughout various ExecutionPlan implementations.
In particular the problem manifests by the process pinning the CPU at 100%, and getting stuck in the EnforceDistribution physical optimizer, which aggravates the underlying problem by inter-leaving RepartitionExec into the existing plan tree.
To Reproduce
Setup
You can use the existing tpcds_physical_q64 test, just un-ignore it and set env_logger::init() somewhere to capture the logs. You'll also need to set RUST_MIN_STACK=3000000 to alleviate an unrelated problem described in #4786.
Flamegraph
Next you can optionally capture the flamegraph to get an initial sense of what's happening
sudo cargo flamegraph --dev -v --test tpcds_planning -- tpcds_physical_q64I see something like flamegraph.svg.zip.
Notably, there is a large build-up of (likely under-sampled) output_partitioning and equivalence_properties calls.
Investigation
I used some counters
pub struct Counter {
count: std::sync::Mutex<u32>,
}
impl Counter {
pub fn new() -> Counter {
Counter {
count: std::sync::Mutex::new(0),
}
}
pub fn reset(&self) {
let mut counter = self.count.lock().unwrap();
*counter = 0;
}
pub fn increment(&self) {
let mut counter = self.count.lock().unwrap();
*counter += 1;
}
pub fn get(&self) -> u32 {
let counter = self.count.lock().unwrap();
*counter
}
}
use once_cell::sync::Lazy;
pub static OUT_PART_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| Arc::new(Counter::new()));
pub static EQ_PROP_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| Arc::new(Counter::new()));and then sprinkled crate::OUT_PART_COUNTER.increment() and crate::EQ_PROP_COUNTER.increment() at the begining of fn output_partitioning(&self) and fn equivalence_properties(&self) respectively, for MemoryExec, RepartitionExec, HashJoinExec, ProjectionExec and AggregateExec.
I also added the following
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 0c5c2d78b..d4c0ddc22 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -208,9 +208,16 @@ impl PhysicalOptimizerRule for EnforceDistribution {
};
let distribution_context = DistributionContext::new_default(adjusted);
+ use datafusion_physical_plan::{OUT_PART_COUNTER, EQ_PROP_COUNTER};
+ OUT_PART_COUNTER.reset();
+ EQ_PROP_COUNTER.reset();
// Distribution enforcement needs to be applied bottom-up.
let distribution_context =
distribution_context.transform_up(&|distribution_context| {
+ log::warn!("output_partitioning {}, equivalence_properties {}", OUT_PART_COUNTER.get(), EQ_PROP_COUNTER.get());
+ OUT_PART_COUNTER.reset();
+ EQ_PROP_COUNTER.reset();
+
ensure_distribution(distribution_context, config)
})?;
Ok(distribution_context.plan)Results
Soon enough after starting the test you'll notice these numbers reach values on the order of 100K and then millions and more as the slowdown in the iterations becomes appreciable
[2024-01-31T13:58:01Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 40955, equivalence_properties 58363
[2024-01-31T13:58:01Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:03Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 245749, equivalence_properties 350184
[2024-01-31T13:58:03Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 81915, equivalence_properties 116731
[2024-01-31T13:58:03Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:06Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 491509, equivalence_properties 700392
[2024-01-31T13:58:07Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 163835, equivalence_properties 233467
[2024-01-31T13:58:07Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:12Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 983029, equivalence_properties 1400808
[2024-01-31T13:58:14Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 327675, equivalence_properties 466939
[2024-01-31T13:58:14Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:25Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 1966069, equivalence_properties 2801640
[2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 655355, equivalence_properties 933883
[2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 1
[2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 1
[2024-01-31T13:58:51Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 3932157, equivalence_properties 5603310
[2024-01-31T13:59:13Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 3932151, equivalence_properties 5603324
[2024-01-31T13:59:50Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 6553589, equivalence_properties 9338876
[2024-01-31T14:02:12Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 24903668, equivalence_properties 35487723Explanation
WARNING: A lot of gritty details below
Taking a look at one of the plans that appears early in the optimization process:
[
"AggregateExec: mode=Partial, gby=[cs_item_sk@0 as cs_item_sk], aggr=[SUM(catalog_sales.cs_ext_list_price), SUM(catalog_returns.cr_refunded_cash + catalog_returns.cr_reversed_charge + catalog_returns.cr_store_credit)]",
" ProjectionExec: expr=[cs_item_sk@0 as cs_item_sk, cs_ext_list_price@2 as cs_ext_list_price, cr_refunded_cash@5 as cr_refunded_cash, cr_reversed_charge@6 as cr_reversed_charge, cr_store_credit@7 as cr_store_credit]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cs_item_sk@0, cr_item_sk@0), (cs_order_number@1, cr_order_number@1)]",
" RepartitionExec: partitioning=Hash([cs_item_sk@0, cs_order_number@1], 12), input_partitions=0",
" MemoryExec: partitions=0, partition_sizes=[]",
" RepartitionExec: partitioning=Hash([cr_item_sk@0, cr_order_number@1], 12), input_partitions=0",
" MemoryExec: partitions=0, partition_sizes=[]",
]and exploring the relevant implementations of output_partitioning and equivalence_properties, you can see that they have the potential to branch off into calling two methods on the input, thus leading to a exponential call tree.
In particular calling just output_partitioning() on the top-most plan leads to:
- in
AggregateExec::output_partitioning1 x out_part() + 2 x input.out_part() + input.eq_prop()
- for the
ProjectionExecnode below this gets expanded to1 x out_part() + 2 x (out_part() + input.out_part() + input.eq_prop()) + (eq_prop() + input.eq_prop()) = 3 x out_part() + eq_prop() + 2 x input.out_part() + 3 x input.eq_prop()
- Subsequently,
HashJoinExecwill expand these calls to left and right inputs3 x out_part() + eq_prop() + 2 x (out_part() + left.out_part() + right.out_part()) + 3 x (eq_prop() + left.eq_prop() + right.eq_prop()) = 5 x out_part() + 4 x eq_prop() + 2 x (left.out_part() + right.out_part()) + 3 x (left.eq_prop() + right.eq_prop())
- Next, given that both left and right inputs above are
RepartitionExecs which are leaf call-nodes forout_part(), but also expandeq_propinto 2 of inputs methods we get5 x out_part() + 4 x eq_prop() + 2 x (out_part() + out_part()) + 3 x (eq_prop() + l_input.eq_prop() + l_input.out_part() + eq_prop() + r_input.eq_prop() + r_input.out_part()) = 9 x out_part() + 10 x eq_prop() + 6 x input.eq_prop() + 6 x input.eq_prop() =
- Lastly the bottom plan nodes are
MemoryExecs which terminate theeq_prop()call chain as well9 x out_part() + 10 x eq_prop() + 6 x eq_prop() + 6 x eq_prop() = 15 x out_part() + 16 x eq_prop()
So for computing 1 thing (Partitioning), from a total of 7 plans the total invocation count for the above two methods was 31, thus with some overlapping. Note that some of these calls involve allocating stuff on the heap as well as some other computations, which can add up when the invocation count grows substantially.
Finally, given that these 2 methods are liberally called inside ensure_distribution and its sub-routines, I think this explains the enormous call count and ultimately the slow-down.
Expected behavior
Some potential mitigations:
- Delay calling and re-use results from
output_partitioningandequivalence_propertieswithin methods. For instance inAggregateExecthe second call toinput.output_partitioningis redundant and the call toinput.equivalence_properties()can also be delayed only in case of a match (and likewise forProjectionExec). - A proper solution would probably involve
EnforceDistributioninterleaving some helper plans alongside withRepartitionExecwhich serve to cache/short-circuit theoutput_partitioningandequivalence_propertiesalready computed for the input below. - Alternatively, some kind of visitor/memoization implementation that records the output the two methods bottom-up in the node tree might also be viable.
Additional context
No response