-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add LimitPushdown optimization rule and CoalesceBatchesExec fetch #11652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…c as supporting limit pushdown
|
I believe if you merge up from main the CI will pass on this PR |
Remove redundant lınes ın docstrıng
ozankabak
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR looks good and can be merged to unblock other work. The rule itself still has a few unnecessary object creations that can be refactored out, but we can do it as a follow-on PR.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alihandroid and @ozankabak -- I think this is a really neat optimization
I spent time reviewing the changes to coalesce batches and the plans and they all made sense to me. I have some comment / documentation suggestions but we could also do them as follow on PRs as well
One thing I find interesting is that this is another example of a optimizer pass in the ExectionPlans that mirrors one DataFusion has for LogicalPlans already
DataFusion have several examples of this already (like projection pushdown) -- and we have several of them in InfluxDB: https://github.com/influxdata/influxdb3_core/tree/main/iox_query/src/physical_optimizer (we also have predicate pushdown and projection pushdown)
I think @crepererum implemented those passes because we make a bunch of ExecutionPlan nodes directly so the LogicalPlan passes can't be used. Maybe we should consider upstreaming them 🤔
| physical_plan | ||
| 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] | ||
| 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] | ||
| physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is a better plan 👍 as it has only a single partition the global limit is unecessary
| false | ||
| } | ||
|
|
||
| /// Returns a fetching variant of this `ExecutionPlan` node, if it supports |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
|
|
||
| /// Returns `true` if a limit can be safely pushed down through this | ||
| /// `ExecutionPlan` node. | ||
| fn supports_limit_pushdown(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this name somewhat confusing as it implied to me it was reporting if with_fetch was implemented, where now I see it refers to if it is ok to push a limit through the node
Perhaps we could rename it to something different. Perhaps can_push_limit or preserves_limit ?
I don't feel strongly about this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also not sure of the best name. If we find something that's clearly better, maybe we can change it in a follow-on before a public release.
| 01)GlobalLimitExec: skip=0, fetch=10 | ||
| 02)--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true | ||
|
|
||
| physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]}, projection=[c1], limit=10, output_ordering=[c1@0 ASC NULLS LAST], has_header=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GlobalLimitExec is not needed here because the CsvExec already has a limit and there is a single partition ✅
| 04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | ||
| 05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] | ||
| 04)------LocalLimitExec: fetch=5 | ||
| 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why wasn't the limit pushed into the RepartitionExec here 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we haven't covered RepartitionExec yet, would be a good idea to do it in a follow-on PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am able to pull the CoalesceBatch into Repartition I think adding support for limit in repartition will become quite easy (as the actual limit code will be reused)
|
I will wait for a little bit longer to merge this in case anyone has more feedback |
| /// Merge the limits of the parent and the child. If at least one of them is a | ||
| /// [`GlobalLimitExec`], the result is also a [`GlobalLimitExec`]. Otherwise, | ||
| /// the result is a [`LocalLimitExec`]. | ||
| fn merge_limits( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
structurally this could be a method on LimitExec too , though this is totally fine too
| 04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | ||
| 05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] | ||
| 04)------LocalLimitExec: fetch=5 | ||
| 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am able to pull the CoalesceBatch into Repartition I think adding support for limit in repartition will become quite easy (as the actual limit code will be reused)
Which issue does this PR close?
Closes #9792.
Rationale for this change
Physical plans can be optimized further by pushing
GlobalLimitExecandLocalLimitExecdown through certain nodes, or using versions of their children nodes with fetch limits, without changing the result. This reduces unnecessary data transfer and processing for a more efficient plan execution.CoalesceBatchesExeccan also benefit from this improvement, and as such, a fetch limit functionality is implemented for it.For example,
can be turned into
and
can be turned into
without changing the result, but using fewer resources and finishing faster
The physical plan in the following excerpt
datafusion/datafusion/sqllogictest/test_files/repartition.slt
Lines 116 to 129 in ecf5323
will turn into
Other examples can be found in the tests provided in
limit_pushdown.rsand other .slt testsWhat changes are included in this PR?
Implement
LimitPushdownRule:ExecutionPlantrait:with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>: Returns fetching version if supported, None otherwise. The default implementation returns Nonesupports_limit_pushdown(&self) -> bool: Returns true if a node supports limit pushdown. The default implemenation returns falseAdd fetch support to
CoalesceBatchesExec:fetchfield andwith_fetchimplementationAre these changes tested?
Unit tests are provided for
LimitPushdownand the new fetching support forCoalesceBatchesExecAre there any user-facing changes?
No. The changes only affect performance