Skip to content

Support left join reordering#229

Closed
Tmonster wants to merge 294 commits intomainfrom
support_left_join_reordering
Closed

Support left join reordering#229
Tmonster wants to merge 294 commits intomainfrom
support_left_join_reordering

Conversation

@Tmonster
Copy link
Owner

@Tmonster Tmonster commented Jan 16, 2025

This PR adds support for left join reordering to the join order optimizer.

This PR includes the following pieces of logic

  1. Removing unnecessary projections. This is needed because queries like (Select * from (select * from t1, t2) left join (select * from t3, t4) on (a = d)) have projections above the left and right subqueries. Currently, the Join Order Optimizer treats Projections as stand along relations and optimizes the children in isolation of the rest of the plan. In cases like this however, the projection is not necessary and is only added as a part of the process to plan a subquery.
  2. Different estimation logic for Left Joins. Right now our cardinality estimator supports inner joins, which is a relatively estimation. However, given that the cardinality of a table resulting from many joins is the same as regardless of the join order, we would like that to hold true for estimates as well. So finding an equation that respects this property took some time. The formula to get this number is below
  3. A refactor of how filters are extracted from Logical filters/Joins. This logic was sort of in two places before (query_graph_manager and relation_manager). Now it is all in relation_manager.cpp, and is a bit more readable (in my opinion).
  4. The actual logic to add the left join. After reading the paper On the correct and complete enumeration of the core search space I was able to gather that left joins should be treated the same as Semi and Anti joins. This means any join happening in the RHS of the Left Join cannot be pulled out of the RHS. Likewise, and join happening outside the RHS of the Left join cannot be pushed into the left join. Another rule that must be followed (making SEMI/ANTI different from LEFT is that any filter operating on a column of the RHS of a left join after the join must not be pushed into the left join). To enforce these rules, once a left join is extracted, all previous filters are visited and checked to see if they use the relation from the RHS of the left join. If yes, then all LHS relations of the left join are required in order to apply the filter).

I ran tpcds on this branch at sf100 and two queries that stood out were q40 and q80

benchmark/tpcds/sf1/q40.benchmark
Old timing: 0.106831
New timing: 0.025253

benchmark/tpcds/sf1/q80.benchmark
Old timing: 0.804367
New timing: 0.46505

taniabogatsch and others added 30 commits January 12, 2025 21:46
…ing stuff, and sadly includes the other patch as well..
…e the threshold at which late materialization is triggered
Maxxen and others added 28 commits January 20, 2025 10:54
This is already in the auto-install list, should be in the auto-load
list as well
…orage version when serializing a database (duckdb#15794)

Follow-up from duckdb#15702
Supersedes/builds on top of duckdb#14981

This PR change the `storage_compatibility_version` from being a setting
set on every session to be written in the database file.

Previously we would set this setting at run-time, and it would be shared
across all database instances:

```sql
ATTACH 'file1.db';
-- write something, to be serialized targeting version v0.10.0
SET storage_compatibility_version = 'v1.0.0';
ATTACH 'file2.db';
-- write something, to be serialized targeting v1.0.0
```

This has a number of issues:

* The storage compatibility version is shared across all attached
databases
* When restarting the system, the `storage_compatibility_version` would
revert back towards the default setting (currently `v0.10.0`)
* When reading a database, we did not know which storage compatibility
version was used, which could lead to hard to understand errors when
reading databases with an older version

### STORAGE_VERSION parameter

This PR reworks this so that the storage version is instead specified on
`ATTACH`. When none is specified:

* The version set in the `storage_compatibility_version` is used when
creating a new database
* The version stored within the database is used when loading an
existing database

As a result, we can target the storage version towards the desired
supported version when creating a new database. When opening an existing
database, we will keep on writing targeting the same DuckDB version
(i.e. we never automatically "upgrade" the file to a newer DuckDB
version). The user can *manually* upgrade a file by opening an older
file while targeting a later storage version.

For example:

```sql
-- use default `storage_compatibility_version`
ATTACH 'new_file.db';
-- explicitly target versions >= v1.2.0
ATTACH 'new_file.db' (STORAGE_VERSION 'v1.2.0');

-- use the storage version stored within the file
ATTACH 'existing_file.db';
-- use storage version v1.2.0 - if the file uses an older storage version, this upgrades the file
ATTACH 'existing_file.db' (STORAGE_VERSION 'v1.2.0');
```

Note that we cannot *downgrade* a file. If we try to open a file that
targets e.g. version v1.2.0 with an explicit storage version of v1.0.0,
we get an error:

```sql
ATTACH 'database_file.db' (STORAGE_VERSION 'v1.2.0');
DETACH database_file;

ATTACH 'database_file.db' (STORAGE_VERSION 'v1.0.0');
-- Error opening "database_file.db": cannot initialize database with storage version 2 - which is lower than what the database itself uses (4). The storage version of an existing database cannot be lowered.
```

### Opening with DuckDB < v1.1.3

When opening a file that targets `v1.2.0` in an older DuckDB version, we
now get a storage incompatibility error:

```sql
duckdb database_file.db
```

```
Error: unable to open database "database_file.db": IO Error: Trying to read a database file with version number 65, but we can only read version 64.
The database file was created with an newer version of DuckDB.

The storage of DuckDB is not yet stable; newer versions of DuckDB cannot read old database files and vice versa.
The storage will be stabilized when version 1.0 releases.

For now, we recommend that you load the database file in a supported version of DuckDB, and use the EXPORT DATABASE command followed by IMPORT DATABASE on the current version of DuckDB.

See the storage page for more information: https://duckdb.org/internals/storage
```

The description in the error is not entirely correct - but the error is
a lot more descriptive than the previous error that would be thrown in
this scenario (which was `INTERNAL Error: Unsupported compression
function type`).

The error message has also been improved in
duckdb#15702 already.
These tests were inadvertendly disabled because the name of the require flag changed sometime back.
These tests
```
test/sql/json/test_json_copy_tpch.test_slow -- 1
test/sql/aggregate/group/test_group_by_parallel.test_slow -- 2
test/sql/parallelism/intraquery/depth_first_evaluation.test_slow -- 3
```
 Were giving memory/storage issues in CI.

For (1.), I've reduced the TPC-H scale factor to 0.1 instead of 1. We're
still testing what we want to test, so this shouldn't be an issue (TPC-H
files are really large when exported to JSON).

For (2.), the setting `PRAGMA verify_external;` was triggering a lot of
partitioning and spilling to test the external grouped aggregation
behavior. It was spilling more than necessary because the HT size
calculation was off a bit, especially when this setting was enabled.

For (3.), a slight increase to the memory limit in the test was enough
to make it work again. We're still testing the behavior we want to.
This PR is a follow-up from duckdb#14175 that changed the return value of
`map_extract` and `element_at` to return a single value from maps by key
instead of a list with the value.

This is a good change, but unfortunately It breaks backwards/forwards
compatibility for serialized query plans. This PR fixes it by reverting
the change to `map_extract` but instead add a new `map_extract_value`
with the new behavior. It also changes the syntax sugar (e.g. `[]`) to
use this new function instead, so the end user still "mostly" sees the
new behavior, while the system internally still keeps the same behavior
for the actual function.
* SelectionVector(start, count) should respect the count when
initializing
* `ColumnFetchState` needs to be transaction-local, not part of the
global state
* Big ROW_GROUP_SIZE require explict STORAGE_VERSION
* ALTERNATIVE_VERIFY=1 implies default to be `latest`, so test do not
make much sense
These tests were inadvertently disabled because the name of the require
flag changed sometime back.

Note: The same should be checked on all extension tests (I think a test
from the fts extension has the same issue).
@Tmonster Tmonster closed this Jan 21, 2025
Tmonster pushed a commit that referenced this pull request Sep 4, 2025
…groups, and limiting vacuum operations for the last number of row groups (duckdb#18829)

When performing a checkpoint, we rewrite columns of row groups that have
had modifications. When performing insertions, all columns are modified,
thus all columns of a row group must be rewritten. As a result, any
insertion followed by a checkpoint triggers a full rewrite of the last
row group.

When dealing with wide tables or string columns with large strings,
rewriting a row group can be relatively expensive. This is particularly
the case when only small insertions have been made. For example,
inserting a single row followed by checkpointing can become expensive as
a result of triggering a rewrite of the last row group:

```sql
create table z as select repeat(sha256(i::varchar), 10) as a, repeat(sha256((i**2)::varchar), 10) as b, repeat(sha256((i**3)::varchar), 10) as c from range(100000) r(i);
CHECKPOINT;
.timer on
insert into z values ('a','b','c');
Run Time (s): real 0.006 user 0.004249 sys 0.001186
CHECKPOINT;
Run Time (s): real 0.611 user 1.163009 sys 0.054814
```

The checkpoint takes 0.6s, despite us inserting only a single row.

#### Appending a new row group

This PR solves this problem by appending a new row group when writing to
a table that has been persisted/checkpointed. As a result, we will no
longer modify the (already checkpointed) data. As a trade-off, we end up
with more (smaller) row groups.

#### Vacuuming
As part of vacuuming, we merge adjacent row groups if they fit within
the row group size. As part of this PR - we restrict merging for the
last few row groups. Otherwise, we would end up with the same problem.
We would create row groups like so:

```
100K rows
1 row
```

Then merge them back into a single row group with size `100K+1`,
effectively performing the same rewrite as before.

Instead, for the last row groups in a file, we need a *minimum
threshold* to merge. This is either the `row group size` itself, or 2X
the size of the first row group we are considering. We also always merge
row groups if the total size is `< 2048` (the vector size). This
exponential merging ensures we don't merge on every single insert, while
also ensuring we never have too many row groups.

We can see this process in action when starting with 100K rows, and
inserting 100 rows at a time - effectively repeating the below script:

```sql
insert into z select 'a','b','c' from range(100);
checkpoint;
select row_group_id, max(count) from pragma_storage_info('z') group by all order by all;
```

```sql
 -- insert batch #1
┌──────────────┬────────────┐
│ row_group_id │ max(count) │
│    int64     │   int64    │
├──────────────┼────────────┤
│            0 │     100000 │
│            1 │        100 │
└──────────────┴────────────┘

-- insert batch #30
┌──────────────┬────────────┐
│ row_group_id │ max(count) │
│    int64     │   int64    │
├──────────────┼────────────┤
│            0 │     100000 │
│            1 │       2000 │
│            2 │       1000 │
└──────────────┴────────────┘
-- insert batch #150
┌──────────────┬────────────┐
│ row_group_id │ max(count) │
│    int64     │   int64    │
├──────────────┼────────────┤
│            0 │     100000 │
│            1 │       8000 │
│            2 │       4000 │
│            3 │       2000 │
│            4 │       1000 │
└──────────────┴────────────┘
-- insert batch #228
┌──────────────┬────────────┐
│ row_group_id │ max(count) │
│    int64     │   int64    │
├──────────────┼────────────┤
│            0 │     100000 │
│            1 │      16000 │
│            2 │       4000 │
│            3 │       2000 │
│            4 │        800 │
└──────────────┴────────────┘
-- insert batch #229
┌──────────────┬────────────┐
│ row_group_id │ max(count) │
│    int64     │   int64    │
├──────────────┼────────────┤
│            0 │     122800 │
│            1 │        100 │
└──────────────┴────────────┘
```


### Performance

Running the above example again, the checkpoint now completes in `0.005`
seconds, instead of `0.6` seconds.

Note that we still do the compaction at some point, so while most
checkpoints will have gotten faster, not all of them will have gotten
faster.

Running the above example 300X, here are the timings:

```sql
create table z as select repeat(sha256(i::varchar), 10) as a, repeat(sha256((i**2)::varchar), 10) as b, repeat(sha256((i**3)::varchar), 10) as c from range(100000) r(i);
-- 300X the below insertion
insert into z select 'a','b','c' from range(100);
CHECKPOINT;
```

|           Summary           | v1.3.2  |  New   |
|-----------------------------|---------|--------|
| Total Time                  | 128.82s | 1.00s  |
| Average Time Per Checkpoint | 0.56s   | 0.01s  |
| Max Checkpoint Time         | 0.631s  | 0.526s |

In the above example, there is still a single checkpoint that does the
full on compaction, and thus has the same timing as the original script
had for every checkpoint.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.