-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
The NestedLoopJoinExec operator can produce output batches with fewer rows than the configured batch_size. To improve performance, we should add a CoalesceBatchesExec operator after it.
When running the following SQL query, the metrics show that the NLJ operator is producing very small batches: (NLJ "output_rows=6" "output_batches=3")
explain analyze select count(*) from range(2,8194) as t1 join range(2,24576) as t2 on t1.value * t2.value < 10;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[count(Int64(1))@0 as count(*)], metrics=[output_rows=1, elapsed_compute=525ns] |
| | AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=1, elapsed_compute=17.663µs] |
| | CoalescePartitionsExec, metrics=[output_rows=24, elapsed_compute=17.979µs] |
| | AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=24, elapsed_compute=2.412497ms] |
| | ProjectionExec: expr=[], metrics=[output_rows=6, elapsed_compute=1.17µs] |
| | NestedLoopJoinExec: join_type=Inner, filter=value@0 * value@1 < 10, metrics=[output_rows=6, build_input_batches=1, build_input_rows=8192, input_batches=3, input_rows=24574, output_batches=3, build_mem_used=65632, build_time=32.303µs, join_time=2.490000275s] |
| | LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=8194, batch_size=8192], metrics=[] |
| | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=42.542µs, repartition_time=1ns, send_time=396.529µs] |
| | LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=24576, batch_size=8192], metrics=[] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.834 seconds.
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
datafusion/datafusion/physical-optimizer/src/coalesce_batches.rs
Lines 63 to 79 in d00a085
| let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some() | |
| || plan_any.downcast_ref::<HashJoinExec>().is_some() | |
| // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec | |
| || plan_any | |
| .downcast_ref::<RepartitionExec>() | |
| .map(|repart_exec| { | |
| !matches!( | |
| repart_exec.partitioning().clone(), | |
| Partitioning::RoundRobinBatch(_) | |
| ) | |
| }) | |
| .unwrap_or(false); | |
| if wrap_in_coalesce { | |
| Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new( | |
| plan, | |
| target_batch_size, | |
| )))) |
CoalesceBatchesExec is added after HashJoinExec
explain analyze select * from range(2,8194) as t1 join range(2,24576) as t2 using(value);
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=8192, elapsed_compute=10.689µs] |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(value@0, value@0)], projection=[value@0], metrics=[output_rows=8192, build_input_batches=24, build_input_rows=8192, input_batches=24, input_rows=24574, output_batches=24, build_mem_used=275776, build_time=446.567µs, join_time=216.082µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=8192, elapsed_compute=20.988µs] |
| | RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=1.975326ms, repartition_time=45.457µs, send_time=6.671µs] |
| | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=15.114µs, repartition_time=1ns, send_time=976ns] |
| | LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=8194, batch_size=8192], metrics=[] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=24574, elapsed_compute=60.501µs] |
| | RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=1.925668ms, repartition_time=153.331µs, send_time=34.965µs] |
| | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=28.387µs, repartition_time=1ns, send_time=1.025µs] |
| | LazyMemoryExec: partitions=1, batch_generators=[range: start=2, end=24576, batch_size=8192], metrics=[] |
| | |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.002 seconds.
ctsk and jonathanc-n
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request