-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Is your feature request related to a problem or challenge?
There are many functions in datafusion-core that take SessionState as arguments but only actually rely on portion of them. This add the additional dependency that is not necessary, therefore blocking us from extracting module out of core #10782.
For example, If we want to pull CatalogProvider out of core, we need to pull out TableProvider first. But because it has scan function that takes SessionState which contains CatalogProviderList therefore there is a circular dependency. Similar issues are already mentioned in #11182
Describe the solution you'd like
I think we need to redesign those functions that take SessionState and minimize the dependencies for them.
Given one of the scan function here, we can see that we only need state.config_options().explain and state.execution_props() instead of the whole SessionState
datafusion/datafusion/core/src/datasource/memory.rs
Lines 207 to 245 in 4bed04e
| async fn scan( | |
| &self, | |
| state: &SessionState, | |
| projection: Option<&Vec<usize>>, | |
| _filters: &[Expr], | |
| _limit: Option<usize>, | |
| ) -> Result<Arc<dyn ExecutionPlan>> { | |
| let mut partitions = vec![]; | |
| for arc_inner_vec in self.batches.iter() { | |
| let inner_vec = arc_inner_vec.read().await; | |
| partitions.push(inner_vec.clone()) | |
| } | |
| let mut exec = | |
| MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?; | |
| let show_sizes = state.config_options().explain.show_sizes; | |
| exec = exec.with_show_sizes(show_sizes); | |
| // add sort information if present | |
| let sort_order = self.sort_order.lock(); | |
| if !sort_order.is_empty() { | |
| let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?; | |
| let file_sort_order = sort_order | |
| .iter() | |
| .map(|sort_exprs| { | |
| create_physical_sort_exprs( | |
| sort_exprs, | |
| &df_schema, | |
| state.execution_props(), | |
| ) | |
| }) | |
| .collect::<Result<Vec<_>>>()?; | |
| exec = exec.with_sort_information(file_sort_order); | |
| } | |
| Ok(Arc::new(exec)) | |
| } |
In this case, we can create TableProviderConext that encapsulates a subset of the information from SessionState.
pub struct TableProviderConext {
explain_options: ExplainOptions,
executionProps: ExecutionProps // maybe with reference to avoid copy
}The same idea applies to PhysicalPlanner, we usually just need PhysicalOptimizeRules or other information about plan.
datafusion/datafusion/core/src/physical_planner.rs
Lines 364 to 385 in 4bed04e
| #[async_trait] | |
| pub trait PhysicalPlanner: Send + Sync { | |
| /// Create a physical plan from a logical plan | |
| async fn create_physical_plan( | |
| &self, | |
| logical_plan: &LogicalPlan, | |
| session_state: &SessionState, | |
| ) -> Result<Arc<dyn ExecutionPlan>>; | |
| /// Create a physical expression from a logical expression | |
| /// suitable for evaluation | |
| /// | |
| /// `expr`: the expression to convert | |
| /// | |
| /// `input_dfschema`: the logical plan schema for evaluating `expr` | |
| fn create_physical_expr( | |
| &self, | |
| expr: &Expr, | |
| input_dfschema: &DFSchema, | |
| session_state: &SessionState, | |
| ) -> Result<Arc<dyn PhysicalExpr>>; | |
| } |
And, create_initial_plan in DefaultPhysicalPlanner, ExecutionOptions is all we need, nothing else.
datafusion/datafusion/core/src/physical_planner.rs
Lines 582 to 585 in 4bed04e
| let planning_concurrency = session_state | |
| .config_options() | |
| .execution | |
| .planning_concurrency; |
Describe alternatives you've considered
No response
Additional context
No response