Skip to content

Conversation

@alihandroid
Copy link
Contributor

Which issue does this PR close?

Closes #9792.

Rationale for this change

Physical plans can be optimized further by pushing GlobalLimitExec and LocalLimitExec down through certain nodes, or using versions of their children nodes with fetch limits, without changing the result. This reduces unnecessary data transfer and processing for a more efficient plan execution.

CoalesceBatchesExec can also benefit from this improvement, and as such, a fetch limit functionality is implemented for it.

For example,

GlobalLimitExec: skip=0, fetch=5
  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

can be turned into

StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=5

and

GlobalLimitExec: skip=0, fetch=5
  CoalescePartitionsExec
    FilterExec: c3@2 > 0
      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
        StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

can be turned into

GlobalLimitExec: skip=0, fetch=5
  CoalescePartitionsExec
    LocalLimitExec: fetch=5
      FilterExec: c3@2 > 0
        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
          StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

without changing the result, but using fewer resources and finishing faster

The physical plan in the following excerpt

query TT
EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5;
----
logical_plan
01)Limit: skip=0, fetch=5
02)--Filter: sink_table.c3 > Int16(0)
03)----TableScan: sink_table projection=[c1, c2, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--CoalescePartitionsExec
03)----CoalesceBatchesExec: target_batch_size=8192
04)------FilterExec: c3@2 > 0
05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

will turn into

01)GlobalLimitExec: skip=0, fetch=5
02)--CoalescePartitionsExec
03)----CoalesceBatchesExec: target_batch_size=8192, fetch=5
04)------FilterExec: c3@2 > 0
05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

Other examples can be found in the tests provided in limit_pushdown.rs and other .slt tests

What changes are included in this PR?

Implement LimitPushdown Rule:

  • Introduced new APIs in the ExecutionPlan trait:
    • with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>: Returns fetching version if supported, None otherwise. The default implementation returns None
    • supports_limit_pushdown(&self) -> bool: Returns true if a node supports limit pushdown. The default implemenation returns false

Add fetch support to CoalesceBatchesExec:

  • Add fetch field and with_fetch implementation
  • Implement fetch limit functionality

Are these changes tested?

Unit tests are provided for LimitPushdown and the new fetching support for CoalesceBatchesExec

Are there any user-facing changes?

No. The changes only affect performance

@alamb
Copy link
Contributor

alamb commented Jul 25, 2024

I believe if you merge up from main the CI will pass on this PR

@github-actions github-actions bot added the optimizer Optimizer rules label Jul 26, 2024
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR looks good and can be merged to unblock other work. The rule itself still has a few unnecessary object creations that can be refactored out, but we can do it as a follow-on PR.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alihandroid and @ozankabak -- I think this is a really neat optimization

I spent time reviewing the changes to coalesce batches and the plans and they all made sense to me. I have some comment / documentation suggestions but we could also do them as follow on PRs as well

One thing I find interesting is that this is another example of a optimizer pass in the ExectionPlans that mirrors one DataFusion has for LogicalPlans already

DataFusion have several examples of this already (like projection pushdown) -- and we have several of them in InfluxDB: https://github.com/influxdata/influxdb3_core/tree/main/iox_query/src/physical_optimizer (we also have predicate pushdown and projection pushdown)

I think @crepererum implemented those passes because we make a bunch of ExecutionPlan nodes directly so the LogicalPlan passes can't be used. Maybe we should consider upstreaming them 🤔

physical_plan
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a better plan 👍 as it has only a single partition the global limit is unecessary

false
}

/// Returns a fetching variant of this `ExecutionPlan` node, if it supports
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯


/// Returns `true` if a limit can be safely pushed down through this
/// `ExecutionPlan` node.
fn supports_limit_pushdown(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this name somewhat confusing as it implied to me it was reporting if with_fetch was implemented, where now I see it refers to if it is ok to push a limit through the node

Perhaps we could rename it to something different. Perhaps can_push_limit or preserves_limit ?

I don't feel strongly about this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also not sure of the best name. If we find something that's clearly better, maybe we can change it in a follow-on before a public release.

01)GlobalLimitExec: skip=0, fetch=10
02)--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true

physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]}, projection=[c1], limit=10, output_ordering=[c1@0 ASC NULLS LAST], has_header=true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GlobalLimitExec is not needed here because the CsvExec already has a limit and there is a single partition ✅

04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC]
04)------LocalLimitExec: fetch=5
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why wasn't the limit pushed into the RepartitionExec here 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we haven't covered RepartitionExec yet, would be a good idea to do it in a follow-on PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am able to pull the CoalesceBatch into Repartition I think adding support for limit in repartition will become quite easy (as the actual limit code will be reused)

@ozankabak
Copy link
Contributor

I will wait for a little bit longer to merge this in case anyone has more feedback

/// Merge the limits of the parent and the child. If at least one of them is a
/// [`GlobalLimitExec`], the result is also a [`GlobalLimitExec`]. Otherwise,
/// the result is a [`LocalLimitExec`].
fn merge_limits(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

structurally this could be a method on LimitExec too , though this is totally fine too

04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC]
04)------LocalLimitExec: fetch=5
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am able to pull the CoalesceBatch into Repartition I think adding support for limit in repartition will become quite easy (as the actual limit code will be reused)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Adding Fetch Support to CoalesceBatchesExec

5 participants