Skip to content

kv/bulk: parallelize sending SSTs due to range bounds#79967

Merged
craig[bot] merged 2 commits intocockroachdb:masterfrom
dt:par-flush
Apr 22, 2022
Merged

kv/bulk: parallelize sending SSTs due to range bounds#79967
craig[bot] merged 2 commits intocockroachdb:masterfrom
dt:par-flush

Conversation

@dt
Copy link
Copy Markdown
Contributor

@dt dt commented Apr 14, 2022

Previously the batcher, when it determined it needed to finish one SST
and send it before starting another, would wait for it to be sent before
moving one. When flushing a buffer that contained data that mapped to
many ranges, this meant many serial flushes, e.g. flushing 512MB of data
from a buffer that had keys uniformly distributed over a table which was
split into 2000 ranges meant waiting for roughly 2000 sequential AddSSTable
requests. When those requests were slow, for example sometimes taking as
much as 1s each or more, this became a major bottleneck.

This change switches the batcher to send files that are ended due to a
round boundary asynchronously, queuing up the request to send and then
starting the next file while it sends, as long as memory capacity in the
monitor allows holding the extra file in memory (as these async sends
could result in using an entire extra buffer's worth of memory if they
all end up in-flight at once, which they easily could if the receivers
are queuing).

Addresses #79615

Release note (performance improvement): Bulk ingestion of unsorted data during IMPORT and schema changes uses a higher level of parallelism to send produced data to the storage layer.

@dt dt requested review from adityamaru and nvb April 14, 2022 20:26
@dt dt requested a review from a team as a code owner April 14, 2022 20:26
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

the buffering adder took over making this split so this was dead code.

Release note: none.
@dt dt force-pushed the par-flush branch 3 times, most recently from 30ba771 to 4b7b852 Compare April 21, 2022 12:53
@dt dt requested a review from a team April 21, 2022 12:53
@dt dt force-pushed the par-flush branch 2 times, most recently from f3da663 to 85318e5 Compare April 21, 2022 17:16
rowCounter storage.RowCounter

mu syncutil.Mutex
grp ctxgroup.Group
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment about what operations the group synchronizes, and when is it reset?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it a descriptive name instead. wdyt?

// rows written in the current batch.
rowCounter storage.RowCounter

mu syncutil.Mutex
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we wrap all the fields we want to lock/unlock in an:

mu struct {
syncutil.Mutex
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a crack at this. It is a little tricky since we read from some of these fields in BufferingAdder but that is after a call to Flush() which called Wait(), so it is mostly just the batcher itself that needs to lock.


// Flush sends the current batch, if any.
func (b *SSTBatcher) Flush(ctx context.Context) error {
if err := b.grp.Wait(); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick comment that we're waiting for addsstable requests across ranges to return

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think the name helps here?

@adityamaru adityamaru self-requested a review April 21, 2022 18:47
@dt dt force-pushed the par-flush branch 2 times, most recently from 5eb6274 to 8496692 Compare April 21, 2022 22:43
@dt
Copy link
Copy Markdown
Contributor Author

dt commented Apr 21, 2022

before and after on a large unsorted IMPORT. Note the shape, but also the y axis labels

Screen Shot 2022-04-21 at 6 49 41 PM

Screen Shot 2022-04-21 at 6 49 33 PM

@dt dt force-pushed the par-flush branch 3 times, most recently from f6bfa71 to 4272c8f Compare April 22, 2022 01:23
Previously the batcher, when it determined it needed to finish one SST
and send it before starting another, would wait for it to be sent before
moving one. When flushing a buffer that contained data that mapped to
many ranges, this meant many serial flushes, e.g. flushing 512MB of data
from a buffer that had keys uniformly distributed over a table which was
split into 2000 ranges meant waiting for roughly 2000 sequential AddSSTable
requests. When those requests were slow, for example sometimes taking as
much as 1s each or more, this became a major bottleneck.

This change switches the batcher to send files that are ended due to a
round boundary asynchronously, queuing up the request to send and then
starting the next file while it sends, as long as memory capacity in the
monitor allows holding the extra file in memory (as these async sends
could result in using an entire extra buffer's worth of memory if they
all end up in-flight at once, which they easily could if the receivers
are queuing).

Release note (performance improvement): Bulk ingestion of unsorted data during IMPORT and schema changes uses a higher level of parallelism to send produced data to the storage layer.
Copy link
Copy Markdown
Contributor

@adityamaru adityamaru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM sans one question.

func (b *SSTBatcher) Close() {
func (b *SSTBatcher) Close(ctx context.Context) {
b.sstWriter.Close()
if err := b.asyncAddSSTs.Wait(); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swallowing the error is okay because we expect every user of the sstbatcher to flush before closing? Should we return an error or maybe fatal if we see an error here instead, so that incase there is an addsstable request that we have not waited for before, and it fails we don’t just carry on silently.

Copy link
Copy Markdown
Contributor Author

@dt dt Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if you don’t Flush before Close, it is already the case that anything in sstwriter that hasn’t been sent is lost, so already you have to flush and check its error if you want to know if you wrote everything successfully you passed to Add or not, so this one seems moot; we only do it so we don’t leak gotoutines and don’t care about the result

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks

@dt
Copy link
Copy Markdown
Contributor Author

dt commented Apr 22, 2022

TFTR!

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Apr 22, 2022

Build failed:

@dt
Copy link
Copy Markdown
Contributor Author

dt commented Apr 22, 2022

backupccl timeout with no stacks in kv/bulk

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Apr 22, 2022

Build succeeded:

@craig craig bot merged commit 11a0a9f into cockroachdb:master Apr 22, 2022
@dt dt deleted the par-flush branch April 22, 2022 19:07
craig bot pushed a commit that referenced this pull request Apr 25, 2022
80467: kv/bulk: pull lastRange out of mu and always pass monitor r=dt a=dt

Follow-up work from #79967.

Pulling lastRange out of the mutex and only updating it on sync flushes simplifies the locking since we don't need its result anyway for range-flushes (as we flushed because were done with that range).

Additionally, the original patch added a mem monitor and used it unconditionally, however some external creators of batcher (restore, stream ingest) make their own batchers and were not populating it. Second commit fixes that.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants