Skip to content

Add Minimum Batch Index + Order Preserving Insertion Rework#7352

Merged
Mytherin merged 44 commits intoduckdb:masterfrom
Mytherin:minimumbatchindex
May 4, 2023
Merged

Add Minimum Batch Index + Order Preserving Insertion Rework#7352
Mytherin merged 44 commits intoduckdb:masterfrom
Mytherin:minimumbatchindex

Conversation

@Mytherin
Copy link
Collaborator

@Mytherin Mytherin commented May 3, 2023

This PR adds a minimum batch index to pipelines that maintain insertion order using batch indexes (see #3700). The minimum batch index signifies that at no point in the future will any thread work on a batch index lower than that one. This, coupled with a new callback in the physical operator (NextBatch) - allows for better parallel processing with batch indexes. It enables the following scenarios:

  • When the minimum batch index is larger than previously written batches (e.g. a batch with batch_index of 7, minimum_batch_index is 10) - we can write those batches out to disk in-order knowing there will be no more rows that come before that batch
  • When the minimum batch index is larger than previously written batches, we can merge small batches together (e.g. if the minimum_batch_index is 10, we can merge batches with index 7 and 9 together, knowing a batch index of 8 will never exist)

Batch Insert Rework

This PR also reworks the batch insert into DuckDB tables to take advantage of the minimum batch index. The previous batch index insert had several limitations:

  • We would only merge adjacent batch indexes (e.g. 7 and 8 would be merged, but not 7 and 9, if 8 was missing). That was because we had no way of knowing whether or not 8 would appear at a later time. Gaps in batch indexes are common if filters are present or in case of unions.
  • Every thread had their own PartialBlockManager that would get flushed independently. For highly compressible data or smaller data sets this could lead to many half-empty blocks being written to disk unnecessarily, compared to the single-threaded case.
  • We would only flush batches to disk larger than a row group, or if they could be merged into batches larger than a row group. This could lead to a lot of data not being optimistically flushed to disk leading to high memory requirements depending on batch size.

These issues resulted in order preserving loads (1) generating larger than required databases, and (2) holding more data in memory than required.

This PR fixes those issues by reworking the way the batched data insertion works.

  • We use the minimum batch index to merge batches, even if they are not adjacent (e.g. if the minimum batch index is 10, we can merge 7 and 9).
  • When finished writing we merge the PartialBlockManager of all threads - this allows colocating of row groups across threads allowing us to match the single-threaded database size
  • We merge batches to disk only after they exceed 3 times the row group size - this reduces the worst case from 1 full row group - 1 row group with 1 row to 3 full row groups - 1 row group with 1 row.

These changes make the parallel order-preserving insertion much more robust against varying batch sizes, and should make it comparable to the parallel non-order-preserving insertion in both performance and the database size it generates (in fact - it should often generate smaller databases because data in insertion order is frequently more compressible).

Many Small Row Groups

COPY (FROM range(100000000)) TO 'small.parquet' (ROW_GROUP_SIZE 5000);
CREATE TABLE small AS FROM 'small.parquet';
Measure v0.7.1 New Single-Threaded preserve_insertion_order=false
Time (s) 1.0s 0.62 2.6s 0.5s
DB Size 4.5MB 2.4MB 2.5MB 2.8MB
# Row Groups 1382 822 814 821

Many Average Sized Row Groups

COPY (FROM range(100000000)) TO 'medium.parquet' (ROW_GROUP_SIZE 200000);
CREATE TABLE medium AS FROM 'medium.parquet';
Measure v0.7.1 New Single-Threaded preserve_insertion_order=false
Time (s) 0.36s 0.37s 2.6s 0.36s
DB Size 4.0MB 2.5MB 2.5MB 2.8MB
# Row Groups 997 997 830 819

Many Big Row Groups

COPY (FROM range(100000000)) TO 'big.parquet' (ROW_GROUP_SIZE 1000000);
CREATE TABLE big AS FROM 'big.parquet';
Measure v0.7.1 New Single-Threaded preserve_insertion_order=false
Time (s) 0.37s 0.37s 2.6s 0.38s
DB Size 3.8MB 2.3MB 2.5MB 2.8MB
# Row Groups 898 898 814 819

Mytherin added 30 commits April 5, 2023 14:46
…en merging collections and instead override the RowGroupCollection field by changing it to a reference<>
…tes, and add concurrency controls to partial block manager
…dex - and also is likely not beneficial if output order matters anyway
… to avoid creating scenarios where we alternate between small half-full row groups
… case of insertion cancellation the first_segment may be deleted now that we share the partial block allocations over threads
@Mytherin Mytherin merged commit 9b1d80a into duckdb:master May 4, 2023
@Mytherin Mytherin deleted the minimumbatchindex branch May 5, 2023 12:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant