Skip to content

Add CoalesceBatchesExec to NestedLoopJoinExec #16328

@UBarney

Description

@UBarney

Is your feature request related to a problem or challenge?

The NestedLoopJoinExec operator can produce output batches with fewer rows than the configured batch_size. To improve performance, we should add a CoalesceBatchesExec operator after it.

When running the following SQL query, the metrics show that the NLJ operator is producing very small batches: (NLJ "output_rows=6" "output_batches=3")

explain analyze select count(*) from range(2,8194) as t1 join range(2,24576) as t2 on t1.value * t2.value < 10;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                        |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[count(Int64(1))@0 as count(*)], metrics=[output_rows=1, elapsed_compute=525ns]                                                                                                                                                                        |
|                   |   AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=1, elapsed_compute=17.663µs]                                                                                                                                                              |
|                   |     CoalescePartitionsExec, metrics=[output_rows=24, elapsed_compute=17.979µs]                                                                                                                                                                                              |
|                   |       AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=24, elapsed_compute=2.412497ms]                                                                                                                                                     |
|                   |         ProjectionExec: expr=[], metrics=[output_rows=6, elapsed_compute=1.17µs]                                                                                                                                                                                            |
|                   |           NestedLoopJoinExec: join_type=Inner, filter=value@0 * value@1 < 10, metrics=[output_rows=6, build_input_batches=1, build_input_rows=8192, input_batches=3, input_rows=24574, output_batches=3, build_mem_used=65632, build_time=32.303µs, join_time=2.490000275s] |
|                   |             LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=8194, batch_size=8192], metrics=[]                                                                                                                                                          |
|                   |             RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=42.542µs, repartition_time=1ns, send_time=396.529µs]                                                                                                                 |
|                   |               LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=24576, batch_size=8192], metrics=[]                                                                                                                                                       |
|                   |                                                                                                                                                                                                                                                                             |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.834 seconds.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
plan,
target_batch_size,
))))

CoalesceBatchesExec is added after HashJoinExec

explain analyze select * from range(2,8194) as t1 join range(2,24576) as t2 using(value);
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                 |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=8192, elapsed_compute=10.689µs]                                                                                                                                                                                                    |
|                   |   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(value@0, value@0)], projection=[value@0], metrics=[output_rows=8192, build_input_batches=24, build_input_rows=8192, input_batches=24, input_rows=24574, output_batches=24, build_mem_used=275776, build_time=446.567µs, join_time=216.082µs] |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=8192, elapsed_compute=20.988µs]                                                                                                                                                                                                |
|                   |       RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=1.975326ms, repartition_time=45.457µs, send_time=6.671µs]                                                                                                                                          |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=15.114µs, repartition_time=1ns, send_time=976ns]                                                                                                                                                  |
|                   |           LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=8194, batch_size=8192], metrics=[]                                                                                                                                                                                     |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=24574, elapsed_compute=60.501µs]                                                                                                                                                                                               |
|                   |       RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=1.925668ms, repartition_time=153.331µs, send_time=34.965µs]                                                                                                                                        |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=28.387µs, repartition_time=1ns, send_time=1.025µs]                                                                                                                                                |
|                   |           LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=24576, batch_size=8192], metrics=[]                                                                                                                                                                                    |
|                   |                                                                                                                                                                                                                                                                                                      |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.002 seconds.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions