Skip to content

importccl: parallelize CSV-skipping workload IMPORT#36060

Closed
dt wants to merge 3 commits intocockroachdb:masterfrom
dt:workload-par
Closed

importccl: parallelize CSV-skipping workload IMPORT#36060
dt wants to merge 3 commits intocockroachdb:masterfrom
dt:workload-par

Conversation

@dt
Copy link
Copy Markdown
Contributor

@dt dt commented Mar 22, 2019

(only last commit, first two commits are #36042)

This spins up multiple workers, each importing every i'th batch, to do
workload IMPORT.

As noted inline, this execution order, as opposed to assinging large
spans of batches to each worker, should mean that adjacent batches are
processed at roughly the same time and thus end up in the same
sort-batch for SST creation, preserving the non-overlapping SSTs when
the workload's batches are ordered and non-overlapping.

Release note: none.

dt added 2 commits March 21, 2019 22:12
This extracts the logic for only sending a progress update when progress has meaningfully advanced and limiting their frequency
from the existing logger, allowing it to be potentially reused elsewhere.

Release note: none.
@dt dt requested review from a team and danhhz March 22, 2019 16:44
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, I have a local PR that does this, which I think has some advantages over what you have here, but your insight about doing them in order is a good one and my impl doesn't do that. Just pushed it to 5f728d9 for reference. Take a look at some of the cleanups I did

Include the two benchmarks I have in my commit message in your commit message, please.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @dt)


pkg/ccl/importccl/read_import_workload.go, line 33 at r3 (raw file):

type workloadReader struct {
	evalCtx *tree.EvalContext

One of the other impls says we need an evalCtx per worker because it's not threadsafe


pkg/ccl/importccl/read_import_workload.go, line 146 at r3 (raw file):

		// together and then ingested together in the same SST, minimzing the amount
		// of overlapping SSTs.
		// TODO(dt): on very long imports, these could drift. We might want to check

Much simpler than this and guaranteed to work is to have each worker use an atomic int to grab batches to process. I think this is worth doing in the initial PR instead of what you have here.

batchIdxAtomic := int64(conf.BatchBegin-1)
for i := 0; i < workers; i++ {
g.GoCtx(func(ctx context.Context) error {
for {
batch := int(atomic.AddInt64(&batchIdxAtomic, 1)
if batch >= conf.BatchEnd {
break
}
...
}
}
}


pkg/ccl/importccl/read_import_workload.go, line 150 at r3 (raw file):

		// that we're significantly above it.
		for i := 0; i < workers; i++ {
			thread := i

s/thread/goroutine/ everywhere in this PR


pkg/ccl/importccl/read_import_workload.go, line 152 at r3 (raw file):

			thread := i
			g.GoCtx(func(ctx context.Context) error {
				conv, err := newRowConverter(w.table, w.evalCtx, w.kvCh)

move everything in this body into a method on (*workloadReader)


pkg/ccl/importccl/read_import_workload.go, line 161 at r3 (raw file):

				var rowIdx int64
				for b := begin + thread; b < end; b += workers {
					// log.Infof(ctx, "%s thread %d of %d importing batch %d", t.Name, thread, workers, b)

intentional?

This spins up multiple workers, each importing every i'th batch, to do
workload IMPORT.

As noted inline, this execution order, as opposed to assinging large
spans of batches to each worker, should mean that adjacent batches are
processed at roughly the same time and thus end up in the same
sort-batch for SST creation, preserving the non-overlapping SSTs when
the workload's batches are ordered and non-overlapping.

Release note: none.
Copy link
Copy Markdown
Contributor Author

@dt dt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added benchmarks -- they don't look quite the same as yours but are similar in shape and our starting times were pretty different so I'm guessing it is mostly hardware.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz)


pkg/ccl/importccl/read_import_workload.go, line 33 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

One of the other impls says we need an evalCtx per worker because it's not threadsafe

Huh, testrace didn't seem to mind, but we only use a couple fields of the ctx (for now) so maybe it just didn't hit the mutation part. easy enough to make a copy for each and not worry about tit.


pkg/ccl/importccl/read_import_workload.go, line 146 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Much simpler than this and guaranteed to work is to have each worker use an atomic int to grab batches to process. I think this is worth doing in the initial PR instead of what you have here.

batchIdxAtomic := int64(conf.BatchBegin-1)
for i := 0; i < workers; i++ {
g.GoCtx(func(ctx context.Context) error {
for {
batch := int(atomic.AddInt64(&batchIdxAtomic, 1)
if batch >= conf.BatchEnd {
break
}
...
}
}
}

Many of the workloads appear to do one row / batch so I was trying to avoid having to do any cross-thread coordination for each row, but I guess in the bigger scheme of things, a barrier is probably trivial. It might also be the case that we should be making the Batch func return more than 1 row per call and that would probably have other benefits too (e.g. could bulk-alloc more).

Done.


pkg/ccl/importccl/read_import_workload.go, line 150 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

s/thread/goroutine/ everywhere in this PR

Done.


pkg/ccl/importccl/read_import_workload.go, line 152 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

move everything in this body into a method on (*workloadReader)

Done

well, actually introduced a new struct to hold extra per-file args it needs to close over and then a method on that.


pkg/ccl/importccl/read_import_workload.go, line 161 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

intentional?

Done.

Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: Sorry if I'm being dense, but I don't see the benchmarks.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @dt)


pkg/ccl/importccl/read_import_workload.go, line 146 at r3 (raw file):

Previously, dt (David Taylor) wrote…

Many of the workloads appear to do one row / batch so I was trying to avoid having to do any cross-thread coordination for each row, but I guess in the bigger scheme of things, a barrier is probably trivial. It might also be the case that we should be making the Batch func return more than 1 row per call and that would probably have other benefits too (e.g. could bulk-alloc more).

Done.

Yeah, I had the same thought, but it's easy to later make this grab more than 1 at a time if we have contention issues. (I suspect we won't for a while because the work of converting a row into kvs is much bigger than one atomic add.) Making batch funcs return more than one row is coming.


pkg/ccl/importccl/read_import_workload.go, line 142 at r4 (raw file):

		workers := workloadReaderWorker{
			w:                w,

let's denormalize whatever this needs from w into fields on this struct


pkg/ccl/importccl/read_import_workload.go, line 168 at r4 (raw file):

// in the SST builder, minimzing the amount of overlapping SSTs ingested.
func (w *workloadReaderWorker) run(ctx context.Context) error {
	evalCtx := w.w.newEvalCtx()

move this to where we instantiate the workloadReaderWorker

@dt dt closed this Mar 25, 2019
@dt dt deleted the workload-par branch March 25, 2019 17:14
@dt
Copy link
Copy Markdown
Contributor Author

dt commented Mar 25, 2019

ugh, I realized I had accidentally detached HEAD when I was benchmarking and that's why the amend got lost, but apparently when I then pushed, it instead deleted the branch ... and GitHub says it can't be reopened.

Copy link
Copy Markdown
Contributor Author

@dt dt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @dt)


pkg/ccl/importccl/read_import_workload.go, line 142 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

let's denormalize whatever this needs from w into fields on this struct

happy to do so, but it uses all three fields of w

Copy link
Copy Markdown
Contributor Author

@dt dt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @dt)


pkg/ccl/importccl/read_import_workload.go, line 168 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

move this to where we instantiate the workloadReaderWorker

so the func is the worker -- there is only one struct instantiated. On second though, I'm going back to no helper struct -- it seems like it complicates it more than just having an anon func inline above to close over all the per-file locals in the readFile loop.

craig bot pushed a commit that referenced this pull request Mar 27, 2019
36106: importccl: parallelize CSV-skipping workload IMPORT r=dt a=dt

(only last commit, first two commits are #36042)

Reopening #36060 after accidentally deleting branch and then github said it cannot re-open :/

This spins up multiple workers, each importing every i'th batch, to do
workload IMPORT.

As noted inline, this execution order, as opposed to assinging large
spans of batches to each worker, should mean that adjacent batches are
processed at roughly the same time and thus end up in the same
sort-batch for SST creation, preserving the non-overlapping SSTs when
the workload's batches are ordered and non-overlapping.

Release note: none.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants