-
Notifications
You must be signed in to change notification settings - Fork 4.1k
importccl: blazing fast IMPORT #34809
Description
IMPORT is one of the first things our customers use in a POC and we use it extensively in our internal testing. Anything we can do to speed it up is pretty high-leverage.
We've laid a great foundation with direct ingestion of SSTables into RocksDB, but there is currently a big distributed sort that is limiting scalability and throughput. Luckily, the schema change team has recently switched index backfills to be based on a technique that uses direct SSTable ingestion, but without the distributed sort! There are a couple details to work out, but no fundamental reason we can't do the same thing for IMPORT, so now is a great time to revisit where we're at and lay down a roadmap for the medium-term future.
Specifically, I have the following goals:
fixtures importshould be as fast or faster thanfixtures loadfixtures importshould scale to 100s of terabytes
fixtures import works by running IMPORT over CSVs that are synthesized from thin air, while fixtures load is a thin wrapper around RESTORE, so this goal is more-or-less that IMPORTon data already on a cluster should be faster thanRESTOREing data from google storage to a cluster on google compute. IMPORThas to do more work thanRESTORE` so if this is possible, it's because of network bandwidth constraints.
At a high-level, IMPORT currently reads a CSV line by line, converting it to our internal mvcc key/values. It currently works by putting these kvs in non-overlapping SSTables, which it then directly ingests. Because we don't have control of the order of rows in the CSVs (plus secondary indexes are basically never sorted in the same order), this requires that we first compute key splits such that the kv data between them is approximately the same size. Then we use these splits as buckets in a big distributed sort.
This big distributed sort means we do the CSV -> kv conversion twice. A distsql flow is set up to sample the splits, converting every CSV -> kvs in the process. Then a followup flow reruns the CSV -> kv conversion, except this time it routes everything according to the splits. This feeds to a second set of processors which buffers every kv in a bucket, sorts them, and runs AddSSTable. These second processors are also responsible for detecting unique index violations (in either the primary index or a secondary index) by finding duplicate keys when it constructs the SSTable for ingestion.
There are a bunch of inefficiencies in this process, but running the CSV->kv conversion twice and doing the distributed sort, along with all the buffering involved, dominates IMPORT's overall throughput. This is all because when we first built IMPORT, it was easier for AddSSTable to only work on non-overlapping buckets (and this was all it needed for RESTORE) The recent index backfill stuff has paved the way for doing overlapping AddSSTables, which means we can rip the double sample and the distributed sort out. @dt has even prototyped this in #34751! (Update: This merged in #34751!)
After that happens, it's worth looking into our new bottlenecks. As always, performance work should be benchmark driven, but here's some preliminary guesses for low hanging fruit.
-
It's silly to be using CSV as an intermediate format for importing workload data. The magic IMPORT thing works by taking a set of rows as query parameters and outputting the data as a CSV, which IMPORT then has to parse into rows of Datums. All of this is allocation and cpu heavy.
The sql team is adding efficient columnar formats and they'll have to be able to serialize them to use them in distributed flows. If they end up using a standard format, we could add it as an alternate non-CSV format for IMPORT, reducing basically all the allocations between the data generation and the kv generation.(Update: dt had a better idea and it merged in importccl: skip detour though CSV when importing from workload #34892!) -
In AddSSTable, we compute range mvccstats by making an iterator that merges existing rocksdb data and sstable data and handing that iterator to our common mvccstats code. This is because there could be existing data in the range and (because of unavoidable internal retries) we could end up applying the same sstable twice. Instead, we could compute the mvccstats for the sstable as we construct it and blindly add it to the existing stats, setting the estimate flag. This will not always be correct, but it's probably a good enough guess for the duration of the IMPORT. Then, after the IMPORT is finished, we go back and compute the exact mvcc stats and clear the estimate flag. This means that we only iterate the keyspace being imported into once, not once per sstable over a given span.
-
SSTables are ingested into RocksDB via the
IngestExternalFilecall. Historically, this meant we had to write them to disk twice, because RocksDB might need to modify a small part of the file, but raft might later need back the exact original payload. At some point, we added an optimization to try theIngestExternalFiledisallowing any modifications to the file, and if that failed, a copy was made andIngestExternalFilewas tried again allowing modifications. As of some recent RocksDB version, it's now possible to runIngestExternalFilein a mode that never modifies the file, guaranteeing that it's only written once. (Update: This merged in storage/engine,libroach: 1x write amplification on ingest sst #34886!) -
Profile and speed up tpcc generator's impls of initial data generation: workload/tpcc: initial data load perf improvements #35322, and probably more
-
Switch
workload.InitialData.Batchto be columnar, which will allow for drastically reduced allocations during initial data generation of frequently used workloads (tpcc, bank), while keeping the easy interface for other workloads: workload: allow columnar data generation (alloc -73%, MB/s +60%) #35349 -
Profile and speed up IMPORT: sqlbase: avoid allocations due to ErrNameString, speed up IMPORT by 10% #35317, and certainly more
TODO(dan): continue fleshing this all out and starting running perf experiments
Epic CRDB-2340
Jira issue: CRDB-4624