importccl: parallelize CSV-skipping workload IMPORT#36060
importccl: parallelize CSV-skipping workload IMPORT#36060dt wants to merge 3 commits intocockroachdb:masterfrom
Conversation
This extracts the logic for only sending a progress update when progress has meaningfully advanced and limiting their frequency from the existing logger, allowing it to be potentially reused elsewhere. Release note: none.
Release note: none.
danhhz
left a comment
There was a problem hiding this comment.
heh, I have a local PR that does this, which I think has some advantages over what you have here, but your insight about doing them in order is a good one and my impl doesn't do that. Just pushed it to 5f728d9 for reference. Take a look at some of the cleanups I did
Include the two benchmarks I have in my commit message in your commit message, please.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @dt)
pkg/ccl/importccl/read_import_workload.go, line 33 at r3 (raw file):
type workloadReader struct { evalCtx *tree.EvalContext
One of the other impls says we need an evalCtx per worker because it's not threadsafe
pkg/ccl/importccl/read_import_workload.go, line 146 at r3 (raw file):
// together and then ingested together in the same SST, minimzing the amount // of overlapping SSTs. // TODO(dt): on very long imports, these could drift. We might want to check
Much simpler than this and guaranteed to work is to have each worker use an atomic int to grab batches to process. I think this is worth doing in the initial PR instead of what you have here.
batchIdxAtomic := int64(conf.BatchBegin-1)
for i := 0; i < workers; i++ {
g.GoCtx(func(ctx context.Context) error {
for {
batch := int(atomic.AddInt64(&batchIdxAtomic, 1)
if batch >= conf.BatchEnd {
break
}
...
}
}
}
pkg/ccl/importccl/read_import_workload.go, line 150 at r3 (raw file):
// that we're significantly above it. for i := 0; i < workers; i++ { thread := i
s/thread/goroutine/ everywhere in this PR
pkg/ccl/importccl/read_import_workload.go, line 152 at r3 (raw file):
thread := i g.GoCtx(func(ctx context.Context) error { conv, err := newRowConverter(w.table, w.evalCtx, w.kvCh)
move everything in this body into a method on (*workloadReader)
pkg/ccl/importccl/read_import_workload.go, line 161 at r3 (raw file):
var rowIdx int64 for b := begin + thread; b < end; b += workers { // log.Infof(ctx, "%s thread %d of %d importing batch %d", t.Name, thread, workers, b)
intentional?
This spins up multiple workers, each importing every i'th batch, to do workload IMPORT. As noted inline, this execution order, as opposed to assinging large spans of batches to each worker, should mean that adjacent batches are processed at roughly the same time and thus end up in the same sort-batch for SST creation, preserving the non-overlapping SSTs when the workload's batches are ordered and non-overlapping. Release note: none.
dt
left a comment
There was a problem hiding this comment.
Added benchmarks -- they don't look quite the same as yours but are similar in shape and our starting times were pretty different so I'm guessing it is mostly hardware.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @danhhz)
pkg/ccl/importccl/read_import_workload.go, line 33 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
One of the other impls says we need an evalCtx per worker because it's not threadsafe
Huh, testrace didn't seem to mind, but we only use a couple fields of the ctx (for now) so maybe it just didn't hit the mutation part. easy enough to make a copy for each and not worry about tit.
pkg/ccl/importccl/read_import_workload.go, line 146 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
Much simpler than this and guaranteed to work is to have each worker use an atomic int to grab batches to process. I think this is worth doing in the initial PR instead of what you have here.
batchIdxAtomic := int64(conf.BatchBegin-1)
for i := 0; i < workers; i++ {
g.GoCtx(func(ctx context.Context) error {
for {
batch := int(atomic.AddInt64(&batchIdxAtomic, 1)
if batch >= conf.BatchEnd {
break
}
...
}
}
}
Many of the workloads appear to do one row / batch so I was trying to avoid having to do any cross-thread coordination for each row, but I guess in the bigger scheme of things, a barrier is probably trivial. It might also be the case that we should be making the Batch func return more than 1 row per call and that would probably have other benefits too (e.g. could bulk-alloc more).
Done.
pkg/ccl/importccl/read_import_workload.go, line 150 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
s/thread/goroutine/ everywhere in this PR
Done.
pkg/ccl/importccl/read_import_workload.go, line 152 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
move everything in this body into a method on (*workloadReader)
Done
well, actually introduced a new struct to hold extra per-file args it needs to close over and then a method on that.
pkg/ccl/importccl/read_import_workload.go, line 161 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
intentional?
Done.
danhhz
left a comment
There was a problem hiding this comment.
Sorry if I'm being dense, but I don't see the benchmarks.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @dt)
pkg/ccl/importccl/read_import_workload.go, line 146 at r3 (raw file):
Previously, dt (David Taylor) wrote…
Many of the workloads appear to do one row / batch so I was trying to avoid having to do any cross-thread coordination for each row, but I guess in the bigger scheme of things, a barrier is probably trivial. It might also be the case that we should be making the Batch func return more than 1 row per call and that would probably have other benefits too (e.g. could bulk-alloc more).
Done.
Yeah, I had the same thought, but it's easy to later make this grab more than 1 at a time if we have contention issues. (I suspect we won't for a while because the work of converting a row into kvs is much bigger than one atomic add.) Making batch funcs return more than one row is coming.
pkg/ccl/importccl/read_import_workload.go, line 142 at r4 (raw file):
workers := workloadReaderWorker{ w: w,
let's denormalize whatever this needs from w into fields on this struct
pkg/ccl/importccl/read_import_workload.go, line 168 at r4 (raw file):
// in the SST builder, minimzing the amount of overlapping SSTs ingested. func (w *workloadReaderWorker) run(ctx context.Context) error { evalCtx := w.w.newEvalCtx()
move this to where we instantiate the workloadReaderWorker
|
ugh, I realized I had accidentally detached HEAD when I was benchmarking and that's why the amend got lost, but apparently when I then pushed, it instead deleted the branch ... and GitHub says it can't be reopened. |
dt
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @dt)
pkg/ccl/importccl/read_import_workload.go, line 142 at r4 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
let's denormalize whatever this needs from w into fields on this struct
happy to do so, but it uses all three fields of w
dt
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @dt)
pkg/ccl/importccl/read_import_workload.go, line 168 at r4 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
move this to where we instantiate the workloadReaderWorker
so the func is the worker -- there is only one struct instantiated. On second though, I'm going back to no helper struct -- it seems like it complicates it more than just having an anon func inline above to close over all the per-file locals in the readFile loop.
36106: importccl: parallelize CSV-skipping workload IMPORT r=dt a=dt (only last commit, first two commits are #36042) Reopening #36060 after accidentally deleting branch and then github said it cannot re-open :/ This spins up multiple workers, each importing every i'th batch, to do workload IMPORT. As noted inline, this execution order, as opposed to assinging large spans of batches to each worker, should mean that adjacent batches are processed at roughly the same time and thus end up in the same sort-batch for SST creation, preserving the non-overlapping SSTs when the workload's batches are ordered and non-overlapping. Release note: none. Co-authored-by: David Taylor <tinystatemachine@gmail.com>
(only last commit, first two commits are #36042)
This spins up multiple workers, each importing every i'th batch, to do
workload IMPORT.
As noted inline, this execution order, as opposed to assinging large
spans of batches to each worker, should mean that adjacent batches are
processed at roughly the same time and thus end up in the same
sort-batch for SST creation, preserving the non-overlapping SSTs when
the workload's batches are ordered and non-overlapping.
Release note: none.