ff: Preserve cached plan information when pushing projection#17129
Conversation
adriangb
left a comment
There was a problem hiding this comment.
This looks solid to me!
| if let Some(new_data_source_exec) = | ||
| new_plan.as_any().downcast_ref::<DataSourceExec>() |
There was a problem hiding this comment.
I cry every time I see a downcast of a trait to a specific type. I understand why it's necessary in this case, I think the existing API is to blame, it's worth moving forward with this to fix the (pretty bad IMO) bug BUT this is the sort of thing I hope a future refactor will avoid the need for.
There was a problem hiding this comment.
maybe it is worth making a ticket to track the issue / proposed solution if there is not one already
There was a problem hiding this comment.
good point, it's #15952, should have put that in the code comment
* Preserve equivalence properties during projection pushdown (#17129) * Adds parquet data diffs --------- Co-authored-by: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com>
| '2023-10-01 00:00:00'::timestamptz AS start_timestamp, | ||
| 'prod' as deployment_environment | ||
| ) | ||
| TO 'data/1.parquet'; |
There was a problem hiding this comment.
BTW this writes to datafusion/sqllogictest/data/1.parquet which was also checked in. I hit issues during upgrade with this here (the data is different so the files being written become different):
I have a fix and will forward port it to main, but wanted to drop a note here in case anyone else saw it
* Preserve equivalence properties during projection pushdown (apache#17129) * Adds parquet data diffs --------- Co-authored-by: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com>
* Preserve equivalence properties during projection pushdown (apache#17129) * Adds parquet data diffs --------- Co-authored-by: Adam Gutglick <adam@spiraldb.com> Co-authored-by: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com>
Which issue does this PR close?
Rationale for this change
This PR modifies
DataSourceExec::try_swapping_with_projectionto preserve equivalence properties when creating a newDataSourceExec.Datafusion was losing equivalence properties when projection pushdown occurred after filter pushdown.
Consider this example:
COPY ( SELECT '00000000000000000000000000000001' AS trace_id, '2023-10-01 00:00:00'::timestamptz AS start_timestamp, 'prod' as deployment_environment ) TO 'data/1.parquet'; COPY ( SELECT '00000000000000000000000000000002' AS trace_id, '2024-10-01 00:00:00'::timestamptz AS start_timestamp, 'staging' as deployment_environment ) TO 'data/2.parquet'; CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'data/'; SET datafusion.execution.parquet.pushdown_filters = true; SELECT deployment_environment FROM t1 WHERE trace_id = '00000000000000000000000000000002' ORDER BY start_timestamp, trace_id; /* SanityCheckPlan caused by Error during planning: Plan: ["SortPreservingMergeExec: [start_timestamp@1 ASC NULLS LAST, trace_id@2 ASC NULLS LAST]", " SortExec: expr=[start_timestamp@1 ASC NULLS LAST], preserve_partitioning=[true]", " DataSourceExec: file_groups={2 groups: [[Users/adriangb/GitHub/datafusion/data/1.parquet], [Users/adriangb/GitHub/datafusion/data/2.parquet]]}, projection=[deployment_environment, start_timestamp, trace_id], file_type=parquet, predicate=trace_id@0 = 00000000000000000000000000000002, pruning_predicate=trace_id_null_count@2 != row_count@3 AND trace_id_min@0 <= 00000000000000000000000000000002 AND 00000000000000000000000000000002 <= trace_id_max@1, required_guarantees=[trace_id in (00000000000000000000000000000002)]"] does not satisfy order requirements: [start_timestamp@1 ASC NULLS LAST, trace_id@2 ASC NULLS LAST]. Child-0 order: [[start_timestamp@1 ASC NULLS LAST]] */trace_idis a constantDataSourceExecthat lost this cached informationSanityCheckPlanto fail because it couldn't determine that ordering requirements were satisfied