importccl: add direct-ingest, no sort IMPORT mode prototype#34751
importccl: add direct-ingest, no sort IMPORT mode prototype#34751craig[bot] merged 3 commits intocockroachdb:masterfrom
Conversation
danhhz
left a comment
There was a problem hiding this comment.
This also is going to need a cluster version check to be safe against one being started during a rolling upgrade. It also needs some amount of testing, but I'm assuming you were just waiting on that to see what we thought of the approach.
I'm in favor of merging it, so we can easily run larger scale correctness and performance tests. Also assuming we can keep this from making it harder to reason about the production codepath (mainly, the big if cp.spec.IngestDirectlyin readImportDataProcessor makes it harder to read than it already was).
For our own internal usage, can you think of any reason this would be incorrect as-is for loading tpcc? We know tpcc doesn't make any uniqueness violations.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)
pkg/ccl/importccl/read_import_proc.go, line 481 at r3 (raw file):
defer tracing.FinishSpan(span) if cp.spec.IngestDirectly {
we should find a way to clean this up before merging
pkg/ccl/importccl/read_import_proc.go, line 525 at r3 (raw file):
if len(buf) > 0 { sort.Sort(buf) for i := range buf {
pull this bit into a method-local func
pkg/jobs/jobspb/jobs.proto, line 99 at r3 (raw file):
// to add ranges with an old split point within them. repeated bytes samples = 8; bool ingest_directly = 11;
needs a comment
pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):
dsp.FinalizePlan(planCtx, &p) // Update job details with the sampled keys, as well as any parsed tables,
no sampled keys, right?
what happens to the job progress? as this thing runs?
pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):
optional int64 walltimeNanos = 11 [(gogoproto.nullable) = false]; optional bool ingestDirectly = 12 [(gogoproto.nullable) = false];
ditto this wants a comment
|
I thought about putting the whole thing behind a cluster setting instead of a query param -- that would avoid the churn in the job record and distsql spec, but I thought maybe it'd be easier to turn it on query-by-query? I dunno, could go either way. I should have also noted: this scrapped most progress stuff. I think wrap reads on the readers though a counter so we have an idea of how much we've read on each reader. When we know the file length, I think for this single-pass import that almost works out to be the actual progress give-or-take the flushing of the chan and sst intermedia buffers, so when we have file length, I think we can get progress back. We we don't, I think it will just have to jump from 0 to done (per reader at least). I think if/when we actually switch to this approach and delete the old code, we'll want to scrap |
|
it occurs to me, even without help from addsstable, I think we could fix the uniqueness violation with the same approach we currently use in addindex: counting the results. Initially I thought that didn't apply where since we didn't have a known row-count to compare to but we actually do -- we can count rows as we import, in each reader as we pass them to the row-converter to ingest, so we could pretty trivially know how many rows we expect to get back. that said, I think I'd stil rather teach addSSTable to help us here since it'd make it much easier to report the offending row than just a "got 5, expected 6". |
madelynnblue
left a comment
There was a problem hiding this comment.
Overall approach lgtm. Can you add at least a on-laptop perf improvement benchstat to the commit message? Also, in light of Pete's recent email about release cycles, is this something that is planned to be in the next release?
| // TODO(dt): buffer to disk instead of all in-mem. | ||
|
|
||
| writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos} | ||
| adder, err := cp.flowCtx.BulkAdder(ctx, cp.flowCtx.ClientDB, 32<<20, writeTS) |
There was a problem hiding this comment.
inline comment on the 32<<20
| buf = append(buf, kvBatch...) | ||
|
|
||
| if len(buf) > sortBatchSize { | ||
| sort.Sort(buf) |
There was a problem hiding this comment.
This inner block could be moved to a common func so it shares impl code with the same block below.
|
w.r.t stabilization period merge policy -- modulo cleanup concerns, I think if we keep it behind a setting or flag, the stability risk to what we're shipping is minimal. If we do go with a flag over a setting, having the additional proto fields in proto when we cut the branch would reduce back-port conflicts. |
danhhz
left a comment
There was a problem hiding this comment.
I thought about putting the whole thing behind a cluster setting instead of a query param -- that would avoid the churn in the job record and distsql spec, but I thought maybe it'd be easier to turn it on query-by-query?
I lean toward query param, being able to opt in a single query on a cluster seems like good flexibility given how experimental this will merge as.
I think if/when we actually switch to this approach and delete the old code, we'll want to scrap transform.
Fine with me. fixtures import is way less hassle than fixtures restore and at some point I hope to make it as fast or faster. As you mention, import followed by backup is a fine replacement.
it occurs to me, even without help from addsstable, I think we could fix the uniqueness violation with the same approach we currently use in addindex: counting the results. Initially I thought that didn't apply where since we didn't have a known row-count to compare to but we actually do -- we can count rows as we import, in each reader as we pass them to the row-converter to ingest, so we could pretty trivially know how many rows we expect to get back.
that said, I think I'd stil rather teach addSSTable to help us here since it'd make it much easier to report the offending row than just a "got 5, expected 6".
Oooh, I like that. Yes, the better error message would be lovely, but any changes to AddSSTable will require careful thought, so this might be an okay interim placeholder. I also would like to keep flexibility for now about splitting as much work as we can out of AddSSTable (mvcc stat estimates, etc) while we're investigating perf stuff. If we moved to a world where mvcc stats were estimated during AddSSTable and fixed up in a later pass, that pass could also do these uniqueness violations, to reduce the number of times we have to read through every bit of data in rocksdb to once.
I think if we keep it behind a setting or flag, the stability risk to what we're shipping is minimal. If we do go with a flag over a setting, having the additional proto fields in proto when we cut the branch would reduce back-port conflicts.
Agreed. I'm fine with merging this now behind a flag, modulo addressing my comments about making it little more clear that the code changes don't affect the prod codepath
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dt and @mjibson)
|
SGTM, merge away. |
dt
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @mjibson)
pkg/ccl/importccl/read_import_proc.go, line 481 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
we should find a way to clean this up before merging
Moving buffer/flush/etc logic to a func helped a little. Moving the whole thing (i.e. the stuff that reads spec, emits to the outputs) felt a little clumsy. If we wanted to go further, I'd just pull this out and start one of two go-routines to instead of the if but I think pulling the actual drain out helped some.
pkg/ccl/importccl/read_import_proc.go, line 489 at r3 (raw file):
Previously, mjibson (Matt Jibson) wrote…
inline comment on the
32<<20
Done.
pkg/ccl/importccl/read_import_proc.go, line 504 at r3 (raw file):
Previously, mjibson (Matt Jibson) wrote…
This inner block could be moved to a common func so it shares impl code with the same block below.
Done.
pkg/ccl/importccl/read_import_proc.go, line 525 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
pull this bit into a method-local func
Done.
pkg/jobs/jobspb/jobs.proto, line 99 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
needs a comment
Done.
pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
no sampled keys, right?
what happens to the job progress? as this thing runs?
Right.
Progress-wise, we don't do much -- we no longer really have "steps" we complete along the way since it is jut a single pass now. If we know file length, I think we can update progress with how much we've read. I'm fine with minimal progress support while this is experimental though and only focusing on that once it stops being side-by-side with the more complicated impl.
pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
ditto this wants a comment
Done.
madelynnblue
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)
pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):
Previously, dt (David Taylor) wrote…
Right.
Progress-wise, we don't do much -- we no longer really have "steps" we complete along the way since it is jut a single pass now. If we know file length, I think we can update progress with how much we've read. I'm fine with minimal progress support while this is experimental though and only focusing on that once it stops being side-by-side with the more complicated impl.
We should definitely be able to do the normal file length progress just like the sampling phase does. Also 100% ok with not shipping that until later.
danhhz
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)
pkg/ccl/importccl/import_stmt.go, line 699 at r5 (raw file):
} var ingestDirectly bool
nit: _, ingestDirectly := opts[importOptionDirectIngest]
pkg/ccl/importccl/read_import_proc.go, line 476 at r5 (raw file):
// Sample KVs group.GoCtx(func(ctx context.Context) error {
I think we could go one step further. Something like:
if cp.spec.IngestDirectly{
group.GoCtx(func(ctx context.Context) error {
...
}
} else {
group.GoCtx(func(ctx context.Context) error {
...
}
}
return group.Wait()
but I'll make my peace if you decide to merge as-is.
pkg/ccl/importccl/read_import_proc.go, line 482 at r5 (raw file):
defer tracing.FinishSpan(span) // IngestDirectly means this reader will just ingest the whatever the KVs
nit: "ingest the whatever the"
pkg/ccl/importccl/read_import_proc.go, line 484 at r5 (raw file):
// IngestDirectly means this reader will just ingest the whatever the KVs // producer emitted, and the only result we push into distsql at the end is // an encoded BulkOpSummary of what we ingested.
nit: "is an encoded" -> "is one row with an encoded" (or otherwise specify one row gets output)
pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):
Previously, mjibson (Matt Jibson) wrote…
We should definitely be able to do the normal file length progress just like the sampling phase does. Also 100% ok with not shipping that until later.
The workload ExportStorage impl is the one I most care about and doesn't support file length, so also okay with saving this for later
pkg/sql/distsql_plan_csv.go, line 662 at r5 (raw file):
} // DistIngest blah.
"blah"
pkg/sql/distsql_plan_csv.go, line 674 at r5 (raw file):
ctx = logtags.AddTag(ctx, "import-distsql-ingest", nil) ctx = logtags.AddTag(ctx, "import-distsql", nil)
adding both of these tags intentional?
pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):
Previously, dt (David Taylor) wrote…
Done.
Doesn't look like this one made it
dt
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @mjibson)
pkg/ccl/importccl/import_stmt.go, line 699 at r5 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit:
_, ingestDirectly := opts[importOptionDirectIngest]
Done.
pkg/ccl/importccl/read_import_proc.go, line 476 at r5 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
I think we could go one step further. Something like:
if cp.spec.IngestDirectly{ group.GoCtx(func(ctx context.Context) error { ... } } else { group.GoCtx(func(ctx context.Context) error { ... } } return group.Wait()but I'll make my peace if you decide to merge as-is.
yeah ,that's what I meant below with the If we wanted to go further, I'd just pull this out and start one of two go-routines to instead of the if. I was initially avoiding doing that since wrapping the whole old block in an else made a big diff and I was initially trying to keep it to the minimum churn in existing code. Did it now though if there are two of us who like the pattern and the majority of that diff is just the whitespace of indenting it.
pkg/ccl/importccl/read_import_proc.go, line 482 at r5 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: "ingest the whatever the"
Done.
pkg/ccl/importccl/read_import_proc.go, line 484 at r5 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: "is an encoded" -> "is one row with an encoded" (or otherwise specify one row gets output)
Done.
pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
The workload ExportStorage impl is the one I most care about and doesn't support file length, so also okay with saving this for later
yeah, I'm punting for now -- experimental direct ingest jobs will just be done when they're done unless file read progress just happens to work, but I don't want to mess with it for now.
I'm happy to work on actually making progress work after the 19.1 branch is cut, if we're confident we'll be making this the default and can delete the old IMPORT code and it's more complex notions of progress (sampling vs read vs write). For now, I don't want to mess with it.
fwiw, I've been thinking a bit about the workload usecase and I think the API separation that I introduced when we added the different format frontends (mysqlout/pgdump/etc) is actually pretty well suited for making a workload frontend: it gets a chan that it puts KVs on (which I think is along the lines of what dan has mentioned as far as skipping the strings/csv/parsing/datum/etc overhead) as well as a func (progress float64) it can call to indicate how far it is, which I think a workload generator could trivially do. Anyway, this is just to say, when we get there, I'm confident we can "fix" progress but don't want to think about a nice-to-have yet.
pkg/sql/distsql_plan_csv.go, line 662 at r5 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
"blah"
Done.
pkg/sql/distsql_plan_csv.go, line 674 at r5 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
adding both of these tags intentional?
Nope, thanks.
pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
Doesn't look like this one made it
Done.
danhhz
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)
pkg/ccl/importccl/read_import_proc.go, line 476 at r5 (raw file):
Previously, dt (David Taylor) wrote…
yeah ,that's what I meant below with the
If we wanted to go further, I'd just pull this out and start one of two go-routines to instead of the if. I was initially avoiding doing that since wrapping the whole old block in an else made a big diff and I was initially trying to keep it to the minimum churn in existing code. Did it now though if there are two of us who like the pattern and the majority of that diff is just the whitespace of indenting it.
reviewable has many faults, but it's pretty good about making it obvious when something simply got indented
dt
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)
pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):
Previously, dt (David Taylor) wrote…
yeah, I'm punting for now -- experimental direct ingest jobs will just be done when they're done unless file read progress just happens to work, but I don't want to mess with it for now.
I'm happy to work on actually making progress work after the 19.1 branch is cut, if we're confident we'll be making this the default and can delete the old IMPORT code and it's more complex notions of progress (sampling vs read vs write). For now, I don't want to mess with it.
fwiw, I've been thinking a bit about the workload usecase and I think the API separation that I introduced when we added the different format frontends (mysqlout/pgdump/etc) is actually pretty well suited for making a
workloadfrontend: it gets a chan that it puts KVs on (which I think is along the lines of what dan has mentioned as far as skipping the strings/csv/parsing/datum/etc overhead) as well as afunc (progress float64)it can call to indicate how far it is, which I think a workload generator could trivially do. Anyway, this is just to say, when we get there, I'm confident we can "fix" progress but don't want to think about a nice-to-have yet.
I got curious to. see if indeed a dedicated frontend would make it as easy as I thought to fix progress and. indeed, it looks like this shouldn't be too tricky: a451831#diff-e37e9ef47c748ff28608fcc3e8407922R72
|
Oh, I almost forgot. This should get a cluster version check to protect against using the new ingest_directly field on ImportDetails during a rolling upgrade.
Nice! |
dt
left a comment
There was a problem hiding this comment.
Added cluster version check.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @mjibson)
This adds a prototype of a no-sort IMPORT where SSTs are generated and added directly from the readers. Each reader builds a batch of keys, sorts it and makes it into an SST and sends it. This means ranges receive many overlapping SSTs — i.e. one from each batch in each reader — and are charged with compacting them, potentially increasing write amplification — but avoids pushing all the KV data into distsql and sorting and buffering it in its entirety to produce perfectly non-overlapping SSTs. The lack of a sort and conflict detection in adding SSTs also means that we may no longer observe uniqueness/PK conflicts — if two different reader batches produce the same key, the one in second SST applied would simply shadow the first. This issue was worked around in index backfills by counting the resulting index entries and comparing that to the number of rows in the table — since every row produced an entry, if one entry shadowed another, the counts would not line up. However we will need to determine a way detect conflicts during IMPORT before we could use this approach as the default if we want to correctly return uniqueness violations. This is an early prototype. All of the new functionality is behind a flag, so we potentially could merge it with the above issue unaddressed — to allow easier experimenting, tuning and benchmarking. Release note: none.
1c1eec3 to
dda42c9
Compare
direct_ingest import jobs should only be run on clusters where all nodes support them. Release note: none.
|
anything left to do here? |
|
merge it! |
|
bors r+ |
34751: importccl: add direct-ingest, no sort IMPORT mode prototype r=dt a=dt This adds a prototype of a no-sort IMPORT where SSTs are generated and added directly from the readers. Each reader builds a batch of keys, sorts it and makes it into an SST and sends it. This means ranges receive many overlapping SSTs — i.e. one from each batch in each reader — and are charged with compacting them, potentially increasing write amplification — but avoids pushing all the KV data into distsql and sorting and buffering it in its entirety to produce perfectly non-overlapping SSTs. The lack of a sort and conflict detection in adding SSTs also means that we may no longer observe uniqueness/PK conflicts — if two different reader batches produce the same key, the one in second SST applied would simply shadow the first. This issue was worked around in index backfills by counting the resulting index entries and comparing that to the number of rows in the table — since every row produced an entry, if one entry shadowed another, the counts would not line up. However we will need to determine a way detect conflicts during IMPORT before we could use this approach as the default if we want to correctly return uniqueness violations. This is an early prototype. All of the new functionality is behind a flag, so we potentially could merge it with the above issue unaddressed — to allow easier experimenting, tuning and benchmarking. Release note: none. Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Build succeeded |
This adds a prototype of a no-sort IMPORT where SSTs are generated and added directly from the readers. Each reader builds a batch of keys, sorts it and makes it into an SST and sends it.
This means ranges receive many overlapping SSTs — i.e. one from each batch in each reader — and are charged with compacting them, potentially increasing write amplification — but avoids pushing all the KV data into distsql and sorting and buffering it in its entirety to produce perfectly non-overlapping SSTs.
The lack of a sort and conflict detection in adding SSTs also means that we may no longer observe uniqueness/PK conflicts — if two different reader batches produce the same key, the one in second SST applied would simply shadow the first. This issue was worked around in index backfills by counting the resulting index entries and comparing that to the number of rows in the table — since every row produced an entry, if one entry shadowed another, the counts would not line up. However we will need to determine a way detect conflicts during IMPORT before we could use this approach as the default if we want to correctly return uniqueness violations.
This is an early prototype. All of the new functionality is behind a flag, so we potentially could merge it with the above issue unaddressed — to allow easier experimenting, tuning and benchmarking.
Release note: none.