Skip to content

NestedLoopJoinExec can create excessively large record batches #12633

@mhilton

Description

@mhilton

Describe the bug

NestedLoopJoinExec (really NestedLoopJoinStream) produces one output batch for each probe side input batch. However it is possible for each row of probe-side input to produce build-side length output rows. This can lead to some very large output batches being produced.

Some queries are being unnecessarily terminated due to high resource usage due to this.

To Reproduce

Using datafusion-cli:

> SHOW datafusion.execution.batch_size;
+---------------------------------+-------+
| name                            | value |
+---------------------------------+-------+
| datafusion.execution.batch_size | 8192  |
+---------------------------------+-------+
1 row(s) fetched. 
Elapsed 0.010 seconds.

> CREATE TABLE test AS VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9);
0 row(s) fetched. 
Elapsed 0.035 seconds.

> EXPLAIN ANALYZE WITH test_t AS (SELECT concat(t1.column1, t2.column1, t3.column1, t4.column1, t5.column1) AS v FROM test t1, test t2, test t3, test t4, test t5) SELECT * FROM test_t tt1 FULL OUTER JOIN test_t tt2 ON tt1.v<>tt2.v;
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | NestedLoopJoinExec: join_type=Full, filter=v@0 != v@1, metrics=[output_rows=9999900000, build_input_batches=10000, build_input_rows=100000, input_batches=10000, input_rows=100000, output_batches=10001, build_mem_used=2492500, build_time=35.770239ms, join_time=309.795829686s] |
|                   |   CoalescePartitionsExec, metrics=[output_rows=100000, elapsed_compute=4.001292ms]                                                                                                                                                                                                  |
|                   |     ProjectionExec: expr=[concat(CAST(column1@1 AS Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, elapsed_compute=54.673797ms]                                                      |
|                   |       CrossJoinExec, metrics=[output_rows=100000, build_input_batches=1, build_input_rows=10, input_batches=1000, input_rows=10000, output_batches=10000, build_mem_used=224, build_time=31.793µs, join_time=9.555719ms]                                                            |
|                   |         MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                   |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=1.579937ms, repartition_time=1ns, send_time=8.415125ms]                                                                                                                          |
|                   |           ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@3 as column1, column1@0 as column1], metrics=[output_rows=10000, elapsed_compute=375.839µs]                                                                                                     |
|                   |             CrossJoinExec, metrics=[output_rows=10000, build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, output_batches=1000, build_mem_used=224, build_time=4.541µs, join_time=567.211µs]                                                            |
|                   |               MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                             |
|                   |               ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@0 as column1], metrics=[output_rows=1000, elapsed_compute=35.377µs]                                                                                                                         |
|                   |                 CrossJoinExec, metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, build_time=1.917µs, join_time=52.879µs]                                                             |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                         |
|                   |                   CrossJoinExec, metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, build_time=2.417µs, join_time=11.377µs]                                                               |
|                   |                     MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                       |
|                   |                     MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                       |
|                   |   ProjectionExec: expr=[concat(CAST(column1@1 AS Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, elapsed_compute=524.070156ms]                                                       |
|                   |     CrossJoinExec, metrics=[output_rows=100000, build_input_batches=1, build_input_rows=10, input_batches=1000, input_rows=10000, output_batches=10000, build_mem_used=224, build_time=7.084µs, join_time=73.915678ms]                                                              |
|                   |       MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                     |
|                   |       RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=2.26374ms, repartition_time=1ns, send_time=28.111445436s]                                                                                                                          |
|                   |         ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@3 as column1, column1@0 as column1], metrics=[output_rows=10000, elapsed_compute=477.113µs]                                                                                                       |
|                   |           CrossJoinExec, metrics=[output_rows=10000, build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, output_batches=1000, build_mem_used=224, build_time=2.75µs, join_time=1.069152ms]                                                              |
|                   |             MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                               |
|                   |             ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@0 as column1], metrics=[output_rows=1000, elapsed_compute=94.708µs]                                                                                                                           |
|                   |               CrossJoinExec, metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, build_time=708ns, join_time=165.327µs]                                                                |
|                   |                 MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                           |
|                   |                 CrossJoinExec, metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, build_time=583ns, join_time=9.25µs]                                                                     |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                         |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                         |
|                   |                                                                                                                                                                                                                                                                                     |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 34.928 seconds.

In this (obviously contrived) example the NestedLoopJoinExec produces 9999900000 output_rows in just 10001 output_batches meaning the mean batch size is 999890.01 rows long. That is significantly bigger than the expected size of 8192.

Expected behavior

NestedLoopJoinExec should produce output batches much nearer to the configured batch size in length. Reducing the memory used by output record batches.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions