-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
If we have a plan that looks like this:
Filter: <...>
SomeExtensionNode
then during filter pushdown, the parent filter node is pushed to all of the extension node's children (of which there are none). Since there are none, the filter node is removed from the plan, producing:
SomeExtensionNode
If the filter expression is a constant expression, such as false, it has no column references that overlap the columns returned from prevent_precidate_push_down_columns() and the filter node is removed.
Additionally, combined with #15046, subqueries may get removed entirely from a plan that contains leaf extension node.
To Reproduce
Here's a simple repro:
#[derive(Debug, Hash, Eq, PartialEq)]
struct TestUserNode {
schema: DFSchemaRef,
}
impl PartialOrd for TestUserNode {
fn partial_cmp(&self, _other: &Self) -> Option<std::cmp::Ordering> {
None
}
}
impl TestUserNode {
fn new() -> Self {
let schema = Arc::new(
DFSchema::new_with_metadata(
vec![(None, Field::new("a", DataType::Int64, false).into())],
Default::default(),
)
.unwrap(),
);
Self { schema }
}
}
impl UserDefinedLogicalNodeCore for TestUserNode {
fn name(&self) -> &str {
"test_node"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}
/// Return the output schema of this logical plan node.
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "TestUserNode")
}
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> datafusion::common::Result<Self> {
assert!(exprs.is_empty());
assert!(inputs.is_empty());
Ok(Self {
schema: self.schema.clone(),
})
}
}
async fn main() {
let state = SessionStateBuilder::new()
.with_optimizer_rules(vec![Arc::new(PushDownFilter::new())])
.build();
let ctx = SessionContext::new_with_state(state);
let node = LogicalPlan::Extension(Extension {
node: Arc::new(TestUserNode::new()),
});
let plan = LogicalPlanBuilder::from(node)
.filter(lit(false))
.unwrap()
.build()
.unwrap();
let df = ctx.execute_logical_plan(plan).await.unwrap();
df.explain(true, false).unwrap().show().await.unwrap();
}produces the following output
+----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------+
| plan_type | plan
|
+----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------+
| initial_logical_plan | Filter: Boolean(false)
|
| | TestUserNode
|
| logical_plan after inline_table_scan | SAME TEXT AS ABOVE
|
| logical_plan after expand_wildcard_rule | SAME TEXT AS ABOVE
|
| logical_plan after resolve_grouping_function | SAME TEXT AS ABOVE
|
| logical_plan after type_coercion | SAME TEXT AS ABOVE
|
| logical_plan after count_wildcard_rule | SAME TEXT AS ABOVE
|
| analyzed_logical_plan | SAME TEXT AS ABOVE
|
| logical_plan after push_down_filter | TestUserNode
|
| logical_plan after push_down_filter | SAME TEXT AS ABOVE
|
| logical_plan | TestUserNode
|
Changing the filter predicate to a subquery that references the outer a column has the same effect because of #15046.
Expected behavior
I would not expect a filter to get "pushed" into a node that has no children. There should be a check that the node has at least one child before doing the replacement.
Additional context
No response