storage,colfetcher: implement local fast-path for COL_BATCH_RESPONSE#95033
storage,colfetcher: implement local fast-path for COL_BATCH_RESPONSE#95033craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
2f439c0 to
db1cca0
Compare
e12d46b to
aa41a61
Compare
44fad31 to
5d137ef
Compare
15dd530 to
65792e5
Compare
michae2
left a comment
There was a problem hiding this comment.
Reviewed 19 of 38 files at r1, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @sumeerbhola, and @yuzefovich)
pkg/kv/kvpb/api.proto line 645 at r1 (raw file):
// // If set, rows and batch_responses will not be set. // TODO(feedback wanted): should this be nullable? This will decrease the size
useless blathering: On one hand, I would expect the majority of ScanRequests to cross the network and need serialization, suggesting that we should optimize for that more common remote case and set this nullable. On the other hand, those remote requests will always have network latency anyway, so maybe it would be better to optimize the local case where it could actually matter, by keeping this not nullable. Eh, I doubt it makes a big difference either way.
pkg/kv/kvserver/batcheval/cmd_scan.go line 76 at r1 (raw file):
} if len(scanRes.ColBatches) > 0 { reply.ColBatches.ColBatches = scanRes.ColBatches
So I guess this works because rpc.internalClientAdapter bypasses gRPC altogether, and just hands the *kvpb.BatchResponse to the kvclient, is that right? Pretty cool.
pkg/sql/colfetcher/cfetcher_wrapper.go line 116 at r1 (raw file):
prevBatchMemUsage := c.fetcherAcc.Used() if prevBatchMemUsage > 0 { if err := c.converterAcc.Grow(ctx, prevBatchMemUsage); err != nil {
When does this shrink?
pkg/sql/colfetcher/cfetcher_wrapper.go line 200 at r1 (raw file):
// serialize the response. for _, t := range tableArgs.typs { if t.Family() == types.EnumFamily {
question: From looking at the definition of types.(*T).IsHydrated I'm wondering, should this check (and other similar checks) be t.UserDefined() instead of t.Family() == types.EnumFamily?
pkg/storage/col_mvcc.go line 487 at r1 (raw file):
res.KVData = append(res.KVData, b) } else { res.ColBatches = append(res.ColBatches, colBatch)
Should we add some kind of assertion here that all previous results from wrapper.NextBatch in this loop were also unserialized? (Is this an invariant?)
DrewKimball
left a comment
There was a problem hiding this comment.
Reviewed 38 of 38 files at r1, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @sumeerbhola and @yuzefovich)
pkg/sql/colexec/colexecutils/deserializer.go line 32 at r1 (raw file):
batch coldata.Batch data []array.Data
Should we account for this?
pkg/sql/colfetcher/cfetcher_wrapper.go line 79 at r1 (raw file):
// only grows / shrinks the account according to its own usage and never // relies on the total used value. converterAcc *mon.BoundAccount
[nit] Could you rename this field since it isn't just used for the converter now?
pkg/sql/colfetcher/cfetcher_wrapper.go line 114 at r1 (raw file):
// of the batch that was returned on the previous call to NextBatch // before we call nextBatchAdapter. prevBatchMemUsage := c.fetcherAcc.Used()
What if we passed a detached allocator to cFetcher, and then made it the responsibility of the cFetcherWrapper to account for every batch? Would that simplify the logic here? If serializing, we could just adjust by the difference with the previous batch's memory on each call to fetcher.NextBatch.
pkg/sql/colfetcher/cfetcher_wrapper.go line 264 at r1 (raw file):
) ([]coldata.Batch, error) { // This memory monitor is not connected to the memory accounting system // since the accounting for these batches will be done by the SQL client.
This monitor/bound account is basically a no-op, right? Could we just pass a nil BoundAccount to NewAllocator?
pkg/sql/colmem/allocator.go line 720 at r1 (raw file):
// ResetMaybeReallocate call. At the moment, it can only be set by the // SetAccountingHelper. noBatchReuse bool
[nit] Here and elsewhere, a name like alwaysReallocate feels a little more straightforward.
pkg/sql/row/kv_batch_fetcher.go line 547 at r1 (raw file):
// under-accounting in most places, this seems acceptable. func popBatch( batches [][]byte, colBatches []coldata.Batch,
[nit] Seems a little nicer to just have a second function that does the same thing for colBatches, but up to you.
5a0a8bf to
46a2725
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @michae2, and @sumeerbhola)
pkg/kv/kvpb/api.proto line 645 at r1 (raw file):
Previously, michae2 (Michael Erickson) wrote…
useless blathering: On one hand, I would expect the majority of ScanRequests to cross the network and need serialization, suggesting that we should optimize for that more common remote case and set this nullable. On the other hand, those remote requests will always have network latency anyway, so maybe it would be better to optimize the local case where it could actually matter, by keeping this not nullable. Eh, I doubt it makes a big difference either way.
Probably you're right that it's likely to not matter much. I'll keep this as non-nullable since then the code seems nicer (batch_responses and col_batches fields then look more similar).
pkg/kv/kvserver/batcheval/cmd_scan.go line 76 at r1 (raw file):
Previously, michae2 (Michael Erickson) wrote…
So I guess this works because
rpc.internalClientAdapterbypasses gRPC altogether, and just hands the*kvpb.BatchResponseto the kvclient, is that right? Pretty cool.
Yes, precisely.
pkg/sql/colexec/colexecutils/deserializer.go line 32 at r1 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
Should we account for this?
This []array.Data seems negligible - array.Data seems to be about 88 bytes in size, and we have one object per vector in the batch, with these objects being reused on each Deserialize call.
pkg/sql/colfetcher/cfetcher_wrapper.go line 79 at r1 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
[nit] Could you rename this field since it isn't just used for the converter now?
Done.
pkg/sql/colfetcher/cfetcher_wrapper.go line 114 at r1 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
What if we passed a detached allocator to
cFetcher, and then made it the responsibility of thecFetcherWrapperto account for every batch? Would that simplify the logic here? If serializing, we could just adjust by the difference with the previous batch's memory on each call tofetcher.NextBatch.
I like it, done.
pkg/sql/colfetcher/cfetcher_wrapper.go line 116 at r1 (raw file):
Previously, michae2 (Michael Erickson) wrote…
When does this shrink?
It doesn't until we're done processing the ScanRequest (when this will shrink when the account is closed). This is by design - all coldata.Batches are live (they are accumulated into a slice), so we just move the accounting for each batch from fetcherAccount into converterAccount, but we do need to keep the accounting live until the very end.
pkg/sql/colfetcher/cfetcher_wrapper.go line 200 at r1 (raw file):
Previously, michae2 (Michael Erickson) wrote…
question: From looking at the definition of
types.(*T).IsHydratedI'm wondering, should this check (and other similar checks) bet.UserDefined()instead oft.Family() == types.EnumFamily?
Enum type is the only user-defined type that we support (this is checked in colbuilder/execplan.go). This wasn't well documented, so I added some comments and an assertion.
pkg/sql/colfetcher/cfetcher_wrapper.go line 264 at r1 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
This monitor/bound account is basically a no-op, right? Could we just pass a nil
BoundAccounttoNewAllocator?
Good point, done.
pkg/sql/colmem/allocator.go line 720 at r1 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
[nit] Here and elsewhere, a name like
alwaysReallocatefeels a little more straightforward.
Done.
pkg/sql/row/kv_batch_fetcher.go line 547 at r1 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
[nit] Seems a little nicer to just have a second function that does the same thing for colBatches, but up to you.
This would result in more code overall (since we'd need to effectively push if out of this function outside, into the callsites), so I'll keep it as is.
pkg/storage/col_mvcc.go line 487 at r1 (raw file):
Previously, michae2 (Michael Erickson) wrote…
Should we add some kind of assertion here that all previous results from
wrapper.NextBatchin this loop were also unserialized? (Is this an invariant?)
Good point, added assertions and adjusted the comments.
46a2725 to
b5d82f8
Compare
DrewKimball
left a comment
There was a problem hiding this comment.
Reviewed 28 of 28 files at r2, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @michae2, @sumeerbhola, and @yuzefovich)
pkg/sql/colexec/colexecutils/deserializer.go line 32 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
This
[]array.Dataseems negligible -array.Dataseems to be about 88 bytes in size, and we have one object per vector in the batch, with these objects being reused on eachDeserializecall.
I see, didn't realize array.Data is small. Sounds good to me.
|
I haven't had a chance to look again yet, but I don't think you need to wait for me (unless you really want to). Please feel free to merge! |
|
Ack, thanks for the reviews! bors r+ |
|
Build failed: |
|
Flake in UI tests. bors r+ |
|
Build failed (retrying...): |
|
Build failed (retrying...): |
|
This flake seems legitimate. bors r+ |
|
Already running a review |
|
bors r- |
|
Canceled. |
This commit implements the local fast-path for the COL_BATCH_RESPONSE scan format. The idea is that if a Scan request is evaluated locally (i.e. on the same node for single-tenant deployments or within the shared process for multi-tenant deployments), then we can avoid the redundant serialization of the columnar batches in the Apache Arrow format and just pass the batches as a slice of pointers through the protobuf. Additionally, this also allows us to avoid a copy of the data from `ScanResponse.BatchResponse` into the columnar batch. To achieve this the ScanResponses and the ReverseScanResponses now contain a new custom `ColBatches` message which only includes `[]coldata.Batch` that is not marshalled as part of the protobuf serialization. Now that we can have a single multi-range request result in locally- and remotely-executed single-range requests, we need to be careful when combining them. In particular, in order to preserve the ordering between single-range requests we now always deserialize the remotely-executed ones (since this "combining" happens on the KV client side and won't be sent over the wire again) while "merging" them accordingly. This required introduction of an injected helper for the deserialization from the Apache Arrow format into the `kvpb` package. This deserialization also required that we have access to the `fetchpb.IndexFetchSpec` proto that is stored in the BatchRequest, thus, the signature of `combine` method has been adjusted to include the reference to the BatchRequest. Additional quirk of this commit is that the `cFetcher` cannot reuse the same batch when it is used by the `cFetcherWrapper` when skipping the serialization. (If it did reuse batches, then the slice of batches would contain multiple references to the same batch, so only the last reference would be correct - all previous ones would have been reset.) To do that the `colmem.SetAccountingHelper` has been adjusted to be able to keep the same heuristic when it comes to the sizing of the batch while always allocating a new one, even if under other circumstances it would have reused the old batch. It's also worth noting the story about memory accounting of these local batches. The `SetAccountingHelper` used by the `cFetcher` always tracks the memory usage only of the last batch, so we need to account for all other batches ourselves. We go around this by providing the `cFetcher` with a "detached" memory account (i.e. an account that is not connected to the memory accounting system) that is used by the `cFetcher` to limit the batch size based on the footprint, and modifying the `cFetcherWrapper` to perform the accounting against the proper memory account. This commit also clarifies the contract of `CFetcherWrapper.NextBatch` that it is the wrapper's responsibility to perform memory accounting of all batches, regardless of the return format, against the provided memory account. This only covers part of the story from the KV server side. On the KV client side the memory accounting is done in `txnKVFetcher`. When the batches are serialized, they are included in `ScanResponse.BatchResponse` field and, thus, are included into `BatchResponse.Size` which we use for accounting. For the non-serialized batches this commit implements the custom `Size()` method so that the true footprint of all `coldata.Batch`es is included into `BatchResponse.Size`. As a result, all local batches (including the ones that were deserialized when combining responses to locally- and remotely-executed requests) are tracked by the `txnKVFetcher` until a new `BatchRequest` is issued, so the ColBatchDirectScan doesn't need to perform the accounting. (Note that we perform the accounting for `ScanResponse.BatchResponse` slices in a similar manner - we don't shrink the memory account when a single response becomes garbage (due to likely under-accounting in other places).) A special note on type schemas with enums: since enums require type hydration that is not easily available on the KV server side and we treat them simply as bytes values, the presence of enums forces us to serialize the batches even for locally-executed requests. This seems like a minor limitation in comparison to not supporting enums at all. Another note on the datum-backed vectors: since the `cFetcherWrapper` also doesn't have access to a valid `eval.Context`, the datum-backed vectors produced by the wrapper are "incomplete". Previously, since we always serialized the batches, it wasn't an issue. However, now if we get a non-serialized batch from the locally-executed request, we must update all datum-backed vectors with the proper eval context. This is done by the `ColBatchDirectScan`. The microbenchmarks of this change when the direct columnar scans are always enabled are [here](https://gist.github.com/yuzefovich/a9b28669f35ff658b2e89ed7b1d43e38). Note that there are three distinct operation modes in that gist: - `Cockroach` and `MultinodeCockroach` - single-tenant deployments - `SharedProcessTenant` - this is how we imagine that dedicated clusters will run once the Unified Architecture is achieved - `SepProcessTenant` - this is how we run Serverless. For the first two this commit results mostly in a minor improvement in latency and sometimes noticeable reducation in allocations, as expected. SepProcessTenant config - which cannot take advantage of the local fastpath - sees a minor slowdown in latency and no changes in allocations, as expected (I'm attributing this to increased overhead of the direct columnar scans and increased size of `ScanResponse` objects). However, these are micro-benchmarks, and they don't show the full picture. In particular, they don't process enough data and often select all columns in the table for this feature to show its benefits. I'm more excited about the results on the TPC-H queries. Here is the impact of this commit on 3 node cluster running in single-tenant model (averaged over 10 runs): ``` Q1: before: 4.46s after: 4.23s -5.15% Q2: before: 3.18s after: 3.30s 3.45% Q3: before: 2.43s after: 2.11s -13.20% Q4: before: 1.83s after: 1.84s 0.44% Q5: before: 2.65s after: 2.48s -6.34% Q6: before: 7.59s after: 7.46s -1.65% Q7: before: 5.56s after: 5.72s 2.71% Q8: before: 1.14s after: 1.11s -2.29% Q9: before: 5.77s after: 5.31s -7.86% Q10: before: 1.98s after: 1.94s -1.92% Q11: before: 0.73s after: 0.69s -5.52% Q12: before: 7.18s after: 6.91s -3.79% Q13: before: 1.24s after: 1.24s 0.16% Q14: before: 0.70s after: 0.66s -5.32% Q15: before: 3.99s after: 3.64s -8.89% Q16: before: 0.95s after: 0.94s -1.16% Q17: before: 0.27s after: 0.26s -5.49% Q18: before: 2.67s after: 2.15s -19.39% Q19: before: 4.03s after: 2.96s -26.46% Q20: before: 12.91s after: 11.49s -10.98% Q21: before: 7.14s after: 6.99s -2.13% Q22: before: 0.60s after: 0.57s -5.48% ``` Furthermore, here is the comparison of the direct columnar scans disabled vs enabled: ``` Q1: before: 4.36s after: 4.23s -2.91% Q2: before: 3.57s after: 3.30s -7.63% Q3: before: 2.31s after: 2.11s -8.61% Q4: before: 1.88s after: 1.84s -2.07% Q5: before: 2.55s after: 2.48s -2.70% Q6: before: 7.94s after: 7.46s -6.04% Q7: before: 5.87s after: 5.72s -2.61% Q8: before: 1.12s after: 1.11s -1.07% Q9: before: 5.79s after: 5.31s -8.27% Q10: before: 1.97s after: 1.94s -1.47% Q11: before: 0.69s after: 0.69s -0.29% Q12: before: 6.99s after: 6.91s -1.16% Q13: before: 1.24s after: 1.24s -0.48% Q14: before: 0.68s after: 0.66s -3.37% Q15: before: 3.72s after: 3.64s -2.23% Q16: before: 0.96s after: 0.94s -1.88% Q17: before: 0.28s after: 0.26s -6.18% Q18: before: 2.47s after: 2.15s -12.87% Q19: before: 3.20s after: 2.96s -7.35% Q20: before: 11.71s after: 11.49s -1.88% Q21: before: 7.00s after: 6.99s -0.06% Q22: before: 0.58s after: 0.57s -2.07% ``` In other words, on TPC-H queries it is now already beneficial to enable the direct columnar scans in single-tenant world (and I think there are more minor optimizations ahead). For reference, [here](https://gist.github.com/yuzefovich/0afce5c0692713cf28712f076bab415b) is the comparison of direct columnar scans disabled vs enabled on this commit. It also shows that we're not that far off from reaching the performance parity in micro-benchmarks. Release note: None
b5d82f8 to
b19a4a8
Compare
|
I made a minor change to include the start key as the error detail ( bors r+ |
|
Build succeeded: |
This commit implements the local fast-path for the COL_BATCH_RESPONSE
scan format. The idea is that if a Scan request is evaluated locally
(i.e. on the same node for single-tenant deployments or within the
shared process for multi-tenant deployments), then we can avoid
the redundant serialization of the columnar batches in the Apache
Arrow format and just pass the batches as a slice of pointers through
the protobuf. Additionally, this also allows us to avoid a copy of the
data from
ScanResponse.BatchResponseinto the columnar batch.To achieve this the ScanResponses and the ReverseScanResponses now
contain a new custom
ColBatchesmessage which only includes[]coldata.Batchthat is not marshalled as part of the protobufserialization.
Now that we can have a single multi-range request result in locally- and
remotely-executed single-range requests, we need to be careful when
combining them. In particular, in order to preserve the ordering between
single-range requests we now always deserialize the remotely-executed
ones (since this "combining" happens on the KV client side and won't be
sent over the wire again) while "merging" them accordingly. This
required introduction of an injected helper for the deserialization from
the Apache Arrow format into the
kvpbpackage. This deserializationalso required that we have access to the
fetchpb.IndexFetchSpecprotothat is stored in the BatchRequest, thus, the signature of
combinemethod has been adjusted to include the reference to the BatchRequest.
Additional quirk of this commit is that the
cFetchercannot reuse thesame batch when it is used by the
cFetcherWrapperwhen skipping theserialization. (If it did reuse batches, then the slice of batches would
contain multiple references to the same batch, so only the last reference
would be correct - all previous ones would have been reset.) To do that
the
colmem.SetAccountingHelperhas been adjusted to be able to keepthe same heuristic when it comes to the sizing of the batch while always
allocating a new one, even if under other circumstances it would have
reused the old batch.
It's also worth noting the story about memory accounting of these local
batches. The
SetAccountingHelperused by thecFetcheralways tracksthe memory usage only of the last batch, so we need to account for all
other batches ourselves. We go around this by providing the
cFetcherwith a "detached" memory account (i.e. an account that is not connected
to the memory accounting system) that is used by the
cFetcherto limitthe batch size based on the footprint, and modifying the
cFetcherWrapperto perform the accounting against the proper memoryaccount. This commit also clarifies the contract of
CFetcherWrapper.NextBatchthat it is the wrapper's responsibility toperform memory accounting of all batches, regardless of the return
format, against the provided memory account.
This only covers part of the story from the KV server side. On the KV
client side the memory accounting is done in
txnKVFetcher. When thebatches are serialized, they are included in
ScanResponse.BatchResponsefield and, thus, are included intoBatchResponse.Sizewhich we use for accounting. For the non-serializedbatches this commit implements the custom
Size()method so that thetrue footprint of all
coldata.Batches is included intoBatchResponse.Size. As a result, all local batches (including the onesthat were deserialized when combining responses to locally- and
remotely-executed requests) are tracked by the
txnKVFetcheruntil anew
BatchRequestis issued, so the ColBatchDirectScan doesn't need toperform the accounting. (Note that we perform the accounting for
ScanResponse.BatchResponseslices in a similar manner - we don'tshrink the memory account when a single response becomes garbage (due
to likely under-accounting in other places).)
A special note on type schemas with enums: since enums require type
hydration that is not easily available on the KV server side and we
treat them simply as bytes values, the presence of enums forces us to
serialize the batches even for locally-executed requests. This seems
like a minor limitation in comparison to not supporting enums at all.
Another note on the datum-backed vectors: since the
cFetcherWrapperalso doesn't have access to a valid
eval.Context, the datum-backedvectors produced by the wrapper are "incomplete". Previously, since we
always serialized the batches, it wasn't an issue. However, now if we
get a non-serialized batch from the locally-executed request, we must
update all datum-backed vectors with the proper eval context. This is
done by the
ColBatchDirectScan.The microbenchmarks of this change when the direct columnar scans are
always enabled are here.
Note that there are three distinct operation modes in that gist:
CockroachandMultinodeCockroach- single-tenant deploymentsSharedProcessTenant- this is how we imagine that dedicated clusterswill run once the Unified Architecture is achieved
SepProcessTenant- this is how we run Serverless.For the first two this commit results mostly in a minor improvement in
latency and sometimes noticeable reducation in allocations, as expected.
SepProcessTenant config - which cannot take advantage of the local
fastpath - sees a minor slowdown in latency and no changes in
allocations, as expected (I'm attributing this to increased overhead of
the direct columnar scans and increased size of
ScanResponseobjects).However, these are micro-benchmarks, and they don't show the full
picture. In particular, they don't process enough data and often select
all columns in the table for this feature to show its benefits. I'm more
excited about the results on the TPC-H queries. Here is the impact of
this commit on 3 node cluster running in single-tenant model (averaged
over 10 runs):
Furthermore, here is the comparison of the direct columnar scans
disabled vs enabled:
In other words, on TPC-H queries it is now already beneficial to
enable the direct columnar scans in single-tenant world (and I think
there are more minor optimizations ahead). For reference, here
is the comparison of direct columnar scans disabled vs enabled on this
commit. It also shows that we're not that far off from reaching the
performance parity in micro-benchmarks.
Addresses: #82323.
Informs: #87610.
Epic: CRDB-14837
Release note: None