kv/bulk: parallelize sending SSTs due to range bounds#79967
kv/bulk: parallelize sending SSTs due to range bounds#79967craig[bot] merged 2 commits intocockroachdb:masterfrom
Conversation
the buffering adder took over making this split so this was dead code. Release note: none.
30ba771 to
4b7b852
Compare
f3da663 to
85318e5
Compare
pkg/kv/bulk/sst_batcher.go
Outdated
| rowCounter storage.RowCounter | ||
|
|
||
| mu syncutil.Mutex | ||
| grp ctxgroup.Group |
There was a problem hiding this comment.
can we add a comment about what operations the group synchronizes, and when is it reset?
There was a problem hiding this comment.
I gave it a descriptive name instead. wdyt?
pkg/kv/bulk/sst_batcher.go
Outdated
| // rows written in the current batch. | ||
| rowCounter storage.RowCounter | ||
|
|
||
| mu syncutil.Mutex |
There was a problem hiding this comment.
can we wrap all the fields we want to lock/unlock in an:
mu struct {
syncutil.Mutex
}
There was a problem hiding this comment.
Took a crack at this. It is a little tricky since we read from some of these fields in BufferingAdder but that is after a call to Flush() which called Wait(), so it is mostly just the batcher itself that needs to lock.
pkg/kv/bulk/sst_batcher.go
Outdated
|
|
||
| // Flush sends the current batch, if any. | ||
| func (b *SSTBatcher) Flush(ctx context.Context) error { | ||
| if err := b.grp.Wait(); err != nil { |
There was a problem hiding this comment.
quick comment that we're waiting for addsstable requests across ranges to return
There was a problem hiding this comment.
think the name helps here?
5eb6274 to
8496692
Compare
f6bfa71 to
4272c8f
Compare
Previously the batcher, when it determined it needed to finish one SST and send it before starting another, would wait for it to be sent before moving one. When flushing a buffer that contained data that mapped to many ranges, this meant many serial flushes, e.g. flushing 512MB of data from a buffer that had keys uniformly distributed over a table which was split into 2000 ranges meant waiting for roughly 2000 sequential AddSSTable requests. When those requests were slow, for example sometimes taking as much as 1s each or more, this became a major bottleneck. This change switches the batcher to send files that are ended due to a round boundary asynchronously, queuing up the request to send and then starting the next file while it sends, as long as memory capacity in the monitor allows holding the extra file in memory (as these async sends could result in using an entire extra buffer's worth of memory if they all end up in-flight at once, which they easily could if the receivers are queuing). Release note (performance improvement): Bulk ingestion of unsorted data during IMPORT and schema changes uses a higher level of parallelism to send produced data to the storage layer.
adityamaru
left a comment
There was a problem hiding this comment.
LGTM sans one question.
| func (b *SSTBatcher) Close() { | ||
| func (b *SSTBatcher) Close(ctx context.Context) { | ||
| b.sstWriter.Close() | ||
| if err := b.asyncAddSSTs.Wait(); err != nil { |
There was a problem hiding this comment.
Swallowing the error is okay because we expect every user of the sstbatcher to flush before closing? Should we return an error or maybe fatal if we see an error here instead, so that incase there is an addsstable request that we have not waited for before, and it fails we don’t just carry on silently.
There was a problem hiding this comment.
Yeah, if you don’t Flush before Close, it is already the case that anything in sstwriter that hasn’t been sent is lost, so already you have to flush and check its error if you want to know if you wrote everything successfully you passed to Add or not, so this one seems moot; we only do it so we don’t leak gotoutines and don’t care about the result
|
TFTR! bors r+ |
|
Build failed: |
|
backupccl timeout with no stacks in kv/bulk bors r+ |
|
Build succeeded: |
80467: kv/bulk: pull lastRange out of mu and always pass monitor r=dt a=dt Follow-up work from #79967. Pulling lastRange out of the mutex and only updating it on sync flushes simplifies the locking since we don't need its result anyway for range-flushes (as we flushed because were done with that range). Additionally, the original patch added a mem monitor and used it unconditionally, however some external creators of batcher (restore, stream ingest) make their own batchers and were not populating it. Second commit fixes that. Co-authored-by: David Taylor <tinystatemachine@gmail.com>


Previously the batcher, when it determined it needed to finish one SST
and send it before starting another, would wait for it to be sent before
moving one. When flushing a buffer that contained data that mapped to
many ranges, this meant many serial flushes, e.g. flushing 512MB of data
from a buffer that had keys uniformly distributed over a table which was
split into 2000 ranges meant waiting for roughly 2000 sequential AddSSTable
requests. When those requests were slow, for example sometimes taking as
much as 1s each or more, this became a major bottleneck.
This change switches the batcher to send files that are ended due to a
round boundary asynchronously, queuing up the request to send and then
starting the next file while it sends, as long as memory capacity in the
monitor allows holding the extra file in memory (as these async sends
could result in using an entire extra buffer's worth of memory if they
all end up in-flight at once, which they easily could if the receivers
are queuing).
Addresses #79615
Release note (performance improvement): Bulk ingestion of unsorted data during IMPORT and schema changes uses a higher level of parallelism to send produced data to the storage layer.